diff --git a/CHANGELOG.md b/CHANGELOG.md index f2e0ded423..857eb0f6c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## [2.33.0](https://github.com/googleapis/java-bigquerystorage/compare/v2.32.1...v2.33.0) (2023-03-01) + + +### Features + +* Add header back to the client ([#2016](https://github.com/googleapis/java-bigquerystorage/issues/2016)) ([de00447](https://github.com/googleapis/java-bigquerystorage/commit/de00447958e5939d7be9d0f7da02323aabbfed8c)) + + +### Bug Fixes + +* Add client shutdown if request waiting in request queue for too long. ([#2017](https://github.com/googleapis/java-bigquerystorage/issues/2017)) ([91da88b](https://github.com/googleapis/java-bigquerystorage/commit/91da88b0ed914bf55111dd9cef2a3fc4b27c3443)) +* Allow StreamWriter settings to override passed in BQ client setting ([#2001](https://github.com/googleapis/java-bigquerystorage/issues/2001)) ([66db8fe](https://github.com/googleapis/java-bigquerystorage/commit/66db8fed26474076fb5aaca5044d39e11f6ef28d)) +* Catch uncaught exception from append loop and add expoential retry to reconnection ([#2015](https://github.com/googleapis/java-bigquerystorage/issues/2015)) ([35db0fb](https://github.com/googleapis/java-bigquerystorage/commit/35db0fb38a929a8f3e4db30ee173ce5a4af43d64)) +* Remove write_location header pending discussion ([#2021](https://github.com/googleapis/java-bigquerystorage/issues/2021)) ([0941d43](https://github.com/googleapis/java-bigquerystorage/commit/0941d4363daf782e0be81c11fdf6a2fe0ff4d7ac)) + ## [2.32.1](https://github.com/googleapis/java-bigquerystorage/compare/v2.32.0...v2.32.1) (2023-02-22) diff --git a/README.md b/README.md index ee2d3069f1..08735dc791 100644 --- a/README.md +++ b/README.md @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.8.0') +implementation platform('com.google.cloud:libraries-bom:26.9.0') implementation 'com.google.cloud:google-cloud-bigquerystorage' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigquerystorage:2.32.0' +implementation 'com.google.cloud:google-cloud-bigquerystorage:2.32.1' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.32.0" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.32.1" ``` ## Authentication diff --git a/google-cloud-bigquerystorage-bom/pom.xml b/google-cloud-bigquerystorage-bom/pom.xml index d61f682280..1119e01e9d 100644 --- a/google-cloud-bigquerystorage-bom/pom.xml +++ b/google-cloud-bigquerystorage-bom/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-bigquerystorage-bom - 2.32.1 + 2.33.0 pom com.google.cloud @@ -52,37 +52,37 @@ com.google.cloud google-cloud-bigquerystorage - 2.32.1 + 2.33.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.156.1 + 0.157.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.156.1 + 0.157.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 2.32.1 + 2.33.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.156.1 + 0.157.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.156.1 + 0.157.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 2.32.1 + 2.33.0 diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 5914aa2586..a09d94c1a0 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-bigquerystorage - 2.32.1 + 2.33.0 jar BigQuery Storage https://github.com/googleapis/java-bigquerystorage @@ -11,7 +11,7 @@ com.google.cloud google-cloud-bigquerystorage-parent - 2.32.1 + 2.33.0 google-cloud-bigquerystorage diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 8eab8bfdde..6da6950d3a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; @@ -32,6 +33,7 @@ import io.grpc.StatusRuntimeException; import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; @@ -65,6 +67,14 @@ class ConnectionWorker implements AutoCloseable { // Maximum wait time on inflight quota before error out. private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000; + /* + * Maximum time waiting for request callback before shutting down the connection. + * + * We will constantly checking how much time we have been waiting for the next request callback + * if we wait too much time we will start shutting down the connections and clean up the queues. + */ + private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15); + private Lock lock; private Condition hasMessageInWaitingQueue; private Condition inflightReduced; @@ -77,6 +87,11 @@ class ConnectionWorker implements AutoCloseable { */ private String streamName; + /* + * The location of this connection. + */ + private String location = null; + /* * The proto schema of rows to write. This schema can change during multiplexing. */ @@ -198,6 +213,12 @@ class ConnectionWorker implements AutoCloseable { */ private final String writerId = UUID.randomUUID().toString(); + /* + * Test only exception behavior testing params. + */ + private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null; + private long testOnlyAppendLoopSleepTime = 0; + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) @@ -205,6 +226,7 @@ public static long getApiMaxRequestBytes() { public ConnectionWorker( String streamName, + String location, ProtoSchema writerSchema, long maxInflightRequests, long maxInflightBytes, @@ -217,6 +239,9 @@ public ConnectionWorker( this.hasMessageInWaitingQueue = lock.newCondition(); this.inflightReduced = lock.newCondition(); this.streamName = streamName; + if (location != null && !location.isEmpty()) { + this.location = location; + } this.maxRetryDuration = maxRetryDuration; if (writerSchema == null) { throw new StatusRuntimeException( @@ -230,6 +255,16 @@ public ConnectionWorker( this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); // Always recreate a client for connection worker. + HashMap newHeaders = new HashMap<>(); + newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); + if (this.location == null) { + newHeaders.put("x-goog-request-params", "write_stream=" + this.streamName); + } + BigQueryWriteSettings stubSettings = + clientSettings + .toBuilder() + .setHeaderProvider(FixedHeaderProvider.create(newHeaders)) + .build(); this.client = BigQueryWriteClient.create(clientSettings); this.appendThread = @@ -240,6 +275,24 @@ public void run() { appendLoop(); } }); + appendThread.setUncaughtExceptionHandler( + (Thread t, Throwable e) -> { + log.warning( + "Exception thrown from append loop, thus stream writer is shutdown due to exception: " + + e.toString()); + lock.lock(); + try { + connectionFinalStatus = e; + // Move all current waiting requests to in flight queue. + while (!this.waitingRequestQueue.isEmpty()) { + AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + this.inflightRequestQueue.addLast(requestWrapper); + } + } finally { + lock.unlock(); + } + cleanupInflightRequests(); + }); this.appendThread.start(); } @@ -249,6 +302,8 @@ private void resetConnection() { // It's safe to directly close the previous connection as the in flight messages // will be picked up by the next connection. this.streamConnection.close(); + Uninterruptibles.sleepUninterruptibly( + calculateSleepTimeMilli(conectionRetryCountWithoutCallback), TimeUnit.MILLISECONDS); } this.streamConnection = new StreamConnection( @@ -270,6 +325,24 @@ public void run(Throwable finalStatus) { /** Schedules the writing of rows at given offset. */ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) { + if (this.location != null && this.location != streamWriter.getLocation()) { + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription( + "StreamWriter with location " + + streamWriter.getLocation() + + " is scheduled to use a connection with location " + + this.location)); + } else if (this.location == null && streamWriter.getStreamName() != this.streamName) { + // Location is null implies this is non-multiplexed connection. + throw new StatusRuntimeException( + Status.fromCode(Code.INVALID_ARGUMENT) + .withDescription( + "StreamWriter with stream name " + + streamWriter.getStreamName() + + " is scheduled to use a connection with stream name " + + this.streamName)); + } Preconditions.checkNotNull(streamWriter); AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); requestBuilder.setProtoRows( @@ -295,6 +368,10 @@ Boolean isUserClosed() { } } + String getWriteLocation() { + return this.location; + } + private ApiFuture appendInternal( StreamWriter streamWriter, AppendRowsRequest message) { AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter); @@ -391,6 +468,22 @@ private void maybeWaitForInflightQuota() { inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); } + @VisibleForTesting + static long calculateSleepTimeMilli(long retryCount) { + return Math.min((long) Math.pow(2, retryCount), 60000); + } + + @VisibleForTesting + void setTestOnlyAppendLoopSleepTime(long testOnlyAppendLoopSleepTime) { + this.testOnlyAppendLoopSleepTime = testOnlyAppendLoopSleepTime; + } + + @VisibleForTesting + void setTestOnlyRunTimeExceptionInAppendLoop( + RuntimeException testOnlyRunTimeExceptionInAppendLoop) { + this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop; + } + public long getInflightWaitSeconds() { return inflightWaitSec.longValue(); } @@ -420,7 +513,7 @@ public void close() { } finally { this.lock.unlock(); } - log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId); + log.info("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId); try { appendThread.join(); } catch (InterruptedException e) { @@ -438,6 +531,7 @@ public void close() { // Backend request has a 2 minute timeout, so wait a little longer than that. this.client.awaitTermination(150, TimeUnit.SECONDS); } catch (InterruptedException ignored) { + log.warning("Client await termination timeout in writer id " + writerId); } try { @@ -482,6 +576,11 @@ private void appendLoop() { this.lock.lock(); try { hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS); + // Check whether we should error out the current append loop. + if (inflightRequestQueue.size() > 0) { + throwIfWaitCallbackTooLong(inflightRequestQueue.getFirst().requestCreationTimeStamp); + } + // Copy the streamConnectionIsConnected guarded by lock to a local variable. // In addition, only reconnect if there is a retriable error. streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null; @@ -496,6 +595,7 @@ private void appendLoop() { } while (!this.waitingRequestQueue.isEmpty()) { AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); + requestWrapper.trySetRequestInsertQueueTime(); this.inflightRequestQueue.addLast(requestWrapper); localQueue.addLast(requestWrapper); } @@ -524,6 +624,10 @@ private void appendLoop() { } finally { lock.unlock(); } + if (testOnlyRunTimeExceptionInAppendLoop != null) { + Uninterruptibles.sleepUninterruptibly(testOnlyAppendLoopSleepTime, TimeUnit.MILLISECONDS); + throw testOnlyRunTimeExceptionInAppendLoop; + } resetConnection(); // Set firstRequestInConnection to indicate the next request to be sent should include // metedata. Reset everytime after reconnection. @@ -612,6 +716,17 @@ private void appendLoop() { log.info("Append thread is done. Stream: " + streamName + " id: " + writerId); } + private void throwIfWaitCallbackTooLong(Instant timeToCheck) { + Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now()); + if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) { + throw new RuntimeException( + String.format( + "Request has waited in inflight queue for %sms for writer %s, " + + "which is over maximum wait time %s", + milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME.toString())); + } + } + /* * Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue. * @@ -649,6 +764,7 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) { } this.lock.lock(); try { + log.warning("Donecallback is not triggered within timeout frame for writer " + writerId); if (connectionFinalStatus == null) { connectionFinalStatus = new StatusRuntimeException( @@ -792,7 +908,7 @@ private boolean isConnectionErrorRetriable(Throwable t) { } private void doneCallback(Throwable finalStatus) { - log.fine( + log.info( "Received done callback. Stream: " + streamName + " worker id: " @@ -832,7 +948,9 @@ private void doneCallback(Throwable finalStatus) { "Connection finished with error " + finalStatus.toString() + " for stream " - + streamName); + + streamName + + " with write id: " + + writerId); } } } finally { @@ -864,12 +982,21 @@ static final class AppendRequestAndResponse { // The writer that issues the call of the request. final StreamWriter streamWriter; + Instant requestCreationTimeStamp; + AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) { this.appendResult = SettableApiFuture.create(); this.message = message; this.messageSize = message.getProtoRows().getSerializedSize(); this.streamWriter = streamWriter; } + + void trySetRequestInsertQueueTime() { + // Only set the first time the caller tries to set the timestamp. + if (requestCreationTimeStamp == null) { + requestCreationTimeStamp = Instant.now(); + } + } } /** Returns the current workload of this worker. */ @@ -960,6 +1087,11 @@ static void setMaxInflightQueueWaitTime(long waitTime) { INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime; } + @VisibleForTesting + static void setMaxInflightRequestWaitTime(Duration waitTime) { + MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime; + } + @AutoValue abstract static class TableSchemaAndTimestamp { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 8fcb84165e..83be8ce52a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -288,7 +288,8 @@ private ConnectionWorker createOrReuseConnectionWorker( String streamReference = streamWriter.getStreamName(); if (connectionWorkerPool.size() < currentMaxConnectionCount) { // Always create a new connection if we haven't reached current maximum. - return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); + return createConnectionWorker( + streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema()); } else { ConnectionWorker existingBestConnection = pickBestLoadConnection( @@ -304,7 +305,10 @@ private ConnectionWorker createOrReuseConnectionWorker( if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) { currentMaxConnectionCount = settings.maxConnectionsPerRegion(); } - return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema()); + return createConnectionWorker( + streamWriter.getStreamName(), + streamWriter.getLocation(), + streamWriter.getProtoSchema()); } else { // Stick to the original connection if all the connections are overwhelmed. if (existingConnectionWorker != null) { @@ -359,8 +363,8 @@ static ConnectionWorker pickBestLoadConnection( * a single stream reference. This is because createConnectionWorker(...) is called via * computeIfAbsent(...) which is at most once per key. */ - private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema writeSchema) - throws IOException { + private ConnectionWorker createConnectionWorker( + String streamName, String location, ProtoSchema writeSchema) throws IOException { if (enableTesting) { // Though atomic integer is super lightweight, add extra if check in case adding future logic. testValueCreateConnectionCount.getAndIncrement(); @@ -368,6 +372,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w ConnectionWorker connectionWorker = new ConnectionWorker( streamName, + location, writeSchema, maxInflightRequests, maxInflightBytes, diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index ff965f0477..b21a52a63d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -208,6 +208,7 @@ private StreamWriter(Builder builder) throws IOException { SingleConnectionOrConnectionPool.ofSingleConnection( new ConnectionWorker( builder.streamName, + builder.location, builder.writerSchema, builder.maxInflightRequest, builder.maxInflightBytes, @@ -308,17 +309,37 @@ static boolean isDefaultStream(String streamName) { return streamMatcher.find(); } - private BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException { + static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException { + BigQueryWriteSettings.Builder settingsBuilder = null; if (builder.client != null) { - return builder.client.getSettings(); + settingsBuilder = builder.client.getSettings().toBuilder(); } else { - return BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(builder.credentialsProvider) - .setTransportChannelProvider(builder.channelProvider) - .setBackgroundExecutorProvider(builder.executorProvider) - .setEndpoint(builder.endpoint) - .build(); + settingsBuilder = + new BigQueryWriteSettings.Builder() + .setTransportChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setChannelsPerCpu(1) + .build()) + .setCredentialsProvider( + BigQueryWriteSettings.defaultCredentialsProviderBuilder().build()) + .setBackgroundExecutorProvider( + BigQueryWriteSettings.defaultExecutorProviderBuilder().build()) + .setEndpoint(BigQueryWriteSettings.getDefaultEndpoint()); } + if (builder.channelProvider != null) { + settingsBuilder.setTransportChannelProvider(builder.channelProvider); + } + if (builder.credentialsProvider != null) { + settingsBuilder.setCredentialsProvider(builder.credentialsProvider); + } + if (builder.executorProvider != null) { + settingsBuilder.setBackgroundExecutorProvider(builder.executorProvider); + } + if (builder.endpoint != null) { + settingsBuilder.setEndpoint(builder.endpoint); + } + + return settingsBuilder.build(); } // Validate whether the fetched connection pool matched certain properties. @@ -542,16 +563,13 @@ public static final class Builder { private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES; - private String endpoint = BigQueryWriteSettings.getDefaultEndpoint(); + private String endpoint = null; - private TransportChannelProvider channelProvider = - BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); + private TransportChannelProvider channelProvider = null; - private CredentialsProvider credentialsProvider = - BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + private CredentialsProvider credentialsProvider = null; - private ExecutorProvider executorProvider = - BigQueryWriteSettings.defaultExecutorProviderBuilder().build(); + private ExecutorProvider executorProvider = null; private FlowController.LimitExceededBehavior limitExceededBehavior = FlowController.LimitExceededBehavior.Block; @@ -633,7 +651,8 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { /** {@code ExecutorProvider} to use to create Executor to run background jobs. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { - this.executorProvider = executorProvider; + this.executorProvider = + Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null."); return this; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 980772b2ff..e558d567c8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -430,6 +430,7 @@ private StreamWriter getTestStreamWriter(String streamName) throws IOException { return StreamWriter.newBuilder(streamName) .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) + .setLocation("us") .setCredentialsProvider(NoCredentialsProvider.create()) .setChannelProvider(serviceHelper.createChannelProvider()) .build(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 4edf0f3e9d..cb4e05ab20 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -38,6 +38,8 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -45,6 +47,7 @@ @RunWith(JUnit4.class) public class ConnectionWorkerTest { + private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/s1"; private static final String TEST_STREAM_2 = "projects/p2/datasets/d2/tables/t2/streams/s2"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; @@ -58,6 +61,7 @@ public class ConnectionWorkerTest { public void setUp() throws Exception { testBigQueryWrite = new FakeBigQueryWrite(); ConnectionWorker.setMaxInflightQueueWaitTime(300000); + ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofMinutes(10)); serviceHelper = new MockServiceHelper( UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); @@ -83,10 +87,12 @@ public void testMultiplexedAppendSuccess() throws Exception { StreamWriter sw1 = StreamWriter.newBuilder(TEST_STREAM_1, client) .setWriterSchema(createProtoSchema("foo")) + .setLocation("us") .build(); StreamWriter sw2 = StreamWriter.newBuilder(TEST_STREAM_2, client) .setWriterSchema(createProtoSchema("complicate")) + .setLocation("us") .build(); // We do a pattern of: // send to stream1, string1 @@ -204,11 +210,20 @@ public void testAppendInSameStream_switchSchema() throws Exception { // send to stream1, schema1 // ... StreamWriter sw1 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); StreamWriter sw2 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema2).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema2) + .build(); StreamWriter sw3 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema3).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema3) + .build(); for (long i = 0; i < appendCount; i++) { switch ((int) i % 4) { case 0: @@ -304,10 +319,14 @@ public void testAppendInSameStream_switchSchema() throws Exception { public void testAppendButInflightQueueFull() throws Exception { ProtoSchema schema1 = createProtoSchema("foo"); StreamWriter sw1 = - StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); ConnectionWorker connectionWorker = new ConnectionWorker( TEST_STREAM_1, + "us", createProtoSchema("foo"), 6, 100000, @@ -351,6 +370,139 @@ public void testAppendButInflightQueueFull() throws Exception { } } + @Test + public void testThrowExceptionWhileWithinAppendLoop() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setLocation("us") + .setWriterSchema(schema1) + .build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); + ConnectionWorker.setMaxInflightQueueWaitTime(500); + + long appendCount = 10; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + connectionWorker.setTestOnlyRunTimeExceptionInAppendLoop( + new RuntimeException("Any exception can happen.")); + // Sleep 1 second before erroring out. + connectionWorker.setTestOnlyAppendLoopSleepTime(1000L); + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); + } + + for (int i = 0; i < appendCount; i++) { + int finalI = i; + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> futures.get(finalI).get().getAppendResult().getOffset().getValue()); + assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen."); + } + + // The future append will directly fail. + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(100)}), + 100) + .get()); + assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen."); + } + + @Test + public void testLocationMismatch() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(schema1) + .setLocation("eu") + .build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + "us", + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(0)}), + 0)); + assertEquals( + "INVALID_ARGUMENT: StreamWriter with location eu is scheduled to use a connection with location us", + ex.getMessage()); + } + + @Test + public void testStreamNameMismatch() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_2, + null, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(0)}), + 0)); + assertEquals( + "INVALID_ARGUMENT: StreamWriter with stream name projects/p1/datasets/d1/tables/t1/streams/s1 is scheduled to use a connection with stream name projects/p2/datasets/d2/tables/t2/streams/s2", + ex.getMessage()); + } + + @Test + public void testExponentialBackoff() throws Exception { + assertThat(ConnectionWorker.calculateSleepTimeMilli(0)).isEqualTo(1); + assertThat(ConnectionWorker.calculateSleepTimeMilli(5)).isEqualTo(32); + assertThat(ConnectionWorker.calculateSleepTimeMilli(100)).isEqualTo(60000); + } + private AppendRowsResponse createAppendResponse(long offset) { return AppendRowsResponse.newBuilder() .setAppendResult( @@ -373,6 +525,7 @@ private ConnectionWorker createConnectionWorker( throws IOException { return new ConnectionWorker( streamName, + "us", createProtoSchema("foo"), maxRequests, maxBytes, @@ -455,4 +608,106 @@ public void testLoadIsOverWhelmed() { Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100); assertThat(load2.isOverwhelmed()).isFalse(); } + + @Test + public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1)); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + null, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3)); + + long appendCount = 10; + for (int i = 0; i < appendCount; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1); + } + + for (int i = 0; i < appendCount; i++) { + int finalI = i; + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> futures.get(finalI).get().getAppendResult().getOffset().getValue()); + assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + } + + // The future append will directly fail. + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(100)}), + 100) + .get()); + assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue"); + } + + @Test + public void testLongTimeIdleWontFail() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1)); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + null, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + TEST_TRACE_ID, + client.getSettings()); + + long appendCount = 10; + for (int i = 0; i < appendCount * 2; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert 5 requests, + List> futures = new ArrayList<>(); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i)); + } + // Sleep 2 seconds to make sure request queue is empty. + Thread.sleep(2000); + assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 0); + for (int i = 0; i < appendCount; i++) { + futures.add( + sendTestMessage( + connectionWorker, + sw1, + createFooProtoRows(new String[] {String.valueOf(i)}), + i + appendCount)); + } + for (int i = 0; i < appendCount * 2; i++) { + assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue()); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 43c5fd2bea..af36273102 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -25,7 +25,10 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.GoogleCredentialsProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.api.gax.rpc.AbortedException; @@ -1366,4 +1369,79 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception { assertTrue(ex.getCause() instanceof InvalidArgumentException); assertFalse(writer.isUserClosed()); } + + @Test(timeout = 10000) + public void testBuilderDefaultSetting() throws Exception { + StreamWriter.Builder writerBuilder = StreamWriter.newBuilder(TEST_STREAM_1); + BigQueryWriteSettings writeSettings = StreamWriter.getBigQueryWriteSettings(writerBuilder); + assertEquals( + BigQueryWriteSettings.defaultExecutorProviderBuilder().build().toString(), + writeSettings.getBackgroundExecutorProvider().toString()); + assertEquals( + BigQueryWriteSettings.defaultCredentialsProviderBuilder().build().toString(), + writeSettings.getCredentialsProvider().toString()); + assertTrue( + writeSettings.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider); + assertEquals( + BigQueryWriteSettings.getDefaultEndpoint(), writeSettings.getEndpoint().toString()); + } + + @Test(timeout = 10000) + public void testBuilderExplicitSetting() throws Exception { + // Client has special seetings. + BigQueryWriteSettings clientSettings = + BigQueryWriteSettings.newBuilder() + .setEndpoint("xxx:345") + .setBackgroundExecutorProvider( + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build()) + .setTransportChannelProvider(serviceHelper.createChannelProvider()) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build(); + BigQueryWriteClient client = BigQueryWriteClient.create(clientSettings); + StreamWriter.Builder writerWithClient = StreamWriter.newBuilder(TEST_STREAM_1, client); + BigQueryWriteSettings writerSettings = StreamWriter.getBigQueryWriteSettings(writerWithClient); + assertEquals("xxx:345", writerSettings.getEndpoint()); + assertTrue( + writerSettings.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider); + assertEquals( + 4, + ((InstantiatingExecutorProvider) writerSettings.getBackgroundExecutorProvider()) + .getExecutorThreadCount()); + + // Explicit setting on StreamWriter is respected. + StreamWriter.Builder writerWithClientWithOverrides = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setEndpoint("yyy:345") + .setExecutorProvider( + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build()) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTimeout(Duration.ofSeconds(500)) + .build()) + .setCredentialsProvider( + BigQueryWriteSettings.defaultCredentialsProviderBuilder() + .setScopesToApply(Arrays.asList("A", "B")) + .build()); + BigQueryWriteSettings writerSettings2 = + StreamWriter.getBigQueryWriteSettings(writerWithClientWithOverrides); + assertEquals("yyy:345", writerSettings2.getEndpoint()); + assertTrue( + writerSettings2.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider); + assertEquals( + 14, + ((InstantiatingExecutorProvider) writerSettings2.getBackgroundExecutorProvider()) + .getExecutorThreadCount()); + assertTrue( + writerSettings2.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider); + assertEquals( + Duration.ofSeconds(500), + ((InstantiatingGrpcChannelProvider) writerSettings2.getTransportChannelProvider()) + .getKeepAliveTimeout()); + assertTrue(writerSettings2.getCredentialsProvider() instanceof GoogleCredentialsProvider); + assertEquals( + 2, + ((GoogleCredentialsProvider) writerSettings2.getCredentialsProvider()) + .getScopesToApply() + .size()); + } } diff --git a/grpc-google-cloud-bigquerystorage-v1/pom.xml b/grpc-google-cloud-bigquerystorage-v1/pom.xml index 2fcca3c433..57d3ac1474 100644 --- a/grpc-google-cloud-bigquerystorage-v1/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 2.32.1 + 2.33.0 grpc-google-cloud-bigquerystorage-v1 GRPC library for grpc-google-cloud-bigquerystorage-v1 com.google.cloud google-cloud-bigquerystorage-parent - 2.32.1 + 2.33.0 diff --git a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml index dad213bb44..0197ad5490 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.156.1 + 0.157.0 grpc-google-cloud-bigquerystorage-v1beta1 GRPC library for grpc-google-cloud-bigquerystorage-v1beta1 com.google.cloud google-cloud-bigquerystorage-parent - 2.32.1 + 2.33.0 diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml index 7ca8666dd3..e8d9f1075e 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.156.1 + 0.157.0 grpc-google-cloud-bigquerystorage-v1beta2 GRPC library for grpc-google-cloud-bigquerystorage-v1beta2 com.google.cloud google-cloud-bigquerystorage-parent - 2.32.1 + 2.33.0 diff --git a/pom.xml b/pom.xml index f18f3f3a93..8471a5447d 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-bigquerystorage-parent pom - 2.32.1 + 2.33.0 BigQuery Storage Parent https://github.com/googleapis/java-bigquerystorage @@ -83,37 +83,37 @@ com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.156.1 + 0.157.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.156.1 + 0.157.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 2.32.1 + 2.33.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.156.1 + 0.157.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.156.1 + 0.157.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 2.32.1 + 2.33.0 com.google.cloud google-cloud-bigquerystorage - 2.32.1 + 2.33.0 org.json diff --git a/proto-google-cloud-bigquerystorage-v1/pom.xml b/proto-google-cloud-bigquerystorage-v1/pom.xml index b48528a360..fda1d7686b 100644 --- a/proto-google-cloud-bigquerystorage-v1/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 2.32.1 + 2.33.0 proto-google-cloud-bigquerystorage-v1 PROTO library for proto-google-cloud-bigquerystorage-v1 com.google.cloud google-cloud-bigquerystorage-parent - 2.32.1 + 2.33.0 diff --git a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml index ac706707ed..3c5c35f3f0 100644 --- a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.156.1 + 0.157.0 proto-google-cloud-bigquerystorage-v1beta1 PROTO library for proto-google-cloud-bigquerystorage-v1beta1 com.google.cloud google-cloud-bigquerystorage-parent - 2.32.1 + 2.33.0 diff --git a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml index 7251ccf20f..cd7466b7dc 100644 --- a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.156.1 + 0.157.0 proto-google-cloud-bigquerystorage-v1beta2 PROTO library for proto-google-cloud-bigquerystorage-v1beta2 com.google.cloud google-cloud-bigquerystorage-parent - 2.32.1 + 2.33.0 diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index 94fdb0e069..a277903578 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -29,7 +29,7 @@ com.google.cloud google-cloud-bigquerystorage - 2.32.1 + 2.33.0 diff --git a/versions.txt b/versions.txt index d17ebd8d7e..405c380f50 100644 --- a/versions.txt +++ b/versions.txt @@ -1,10 +1,10 @@ # Format: # module:released-version:current-version -google-cloud-bigquerystorage:2.32.1:2.32.1 -grpc-google-cloud-bigquerystorage-v1beta1:0.156.1:0.156.1 -grpc-google-cloud-bigquerystorage-v1beta2:0.156.1:0.156.1 -grpc-google-cloud-bigquerystorage-v1:2.32.1:2.32.1 -proto-google-cloud-bigquerystorage-v1beta1:0.156.1:0.156.1 -proto-google-cloud-bigquerystorage-v1beta2:0.156.1:0.156.1 -proto-google-cloud-bigquerystorage-v1:2.32.1:2.32.1 +google-cloud-bigquerystorage:2.33.0:2.33.0 +grpc-google-cloud-bigquerystorage-v1beta1:0.157.0:0.157.0 +grpc-google-cloud-bigquerystorage-v1beta2:0.157.0:0.157.0 +grpc-google-cloud-bigquerystorage-v1:2.33.0:2.33.0 +proto-google-cloud-bigquerystorage-v1beta1:0.157.0:0.157.0 +proto-google-cloud-bigquerystorage-v1beta2:0.157.0:0.157.0 +proto-google-cloud-bigquerystorage-v1:2.33.0:2.33.0