diff --git a/CHANGELOG.md b/CHANGELOG.md
index f2e0ded423..857eb0f6c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,20 @@
# Changelog
+## [2.33.0](https://github.com/googleapis/java-bigquerystorage/compare/v2.32.1...v2.33.0) (2023-03-01)
+
+
+### Features
+
+* Add header back to the client ([#2016](https://github.com/googleapis/java-bigquerystorage/issues/2016)) ([de00447](https://github.com/googleapis/java-bigquerystorage/commit/de00447958e5939d7be9d0f7da02323aabbfed8c))
+
+
+### Bug Fixes
+
+* Add client shutdown if request waiting in request queue for too long. ([#2017](https://github.com/googleapis/java-bigquerystorage/issues/2017)) ([91da88b](https://github.com/googleapis/java-bigquerystorage/commit/91da88b0ed914bf55111dd9cef2a3fc4b27c3443))
+* Allow StreamWriter settings to override passed in BQ client setting ([#2001](https://github.com/googleapis/java-bigquerystorage/issues/2001)) ([66db8fe](https://github.com/googleapis/java-bigquerystorage/commit/66db8fed26474076fb5aaca5044d39e11f6ef28d))
+* Catch uncaught exception from append loop and add expoential retry to reconnection ([#2015](https://github.com/googleapis/java-bigquerystorage/issues/2015)) ([35db0fb](https://github.com/googleapis/java-bigquerystorage/commit/35db0fb38a929a8f3e4db30ee173ce5a4af43d64))
+* Remove write_location header pending discussion ([#2021](https://github.com/googleapis/java-bigquerystorage/issues/2021)) ([0941d43](https://github.com/googleapis/java-bigquerystorage/commit/0941d4363daf782e0be81c11fdf6a2fe0ff4d7ac))
+
## [2.32.1](https://github.com/googleapis/java-bigquerystorage/compare/v2.32.0...v2.32.1) (2023-02-22)
diff --git a/README.md b/README.md
index ee2d3069f1..08735dc791 100644
--- a/README.md
+++ b/README.md
@@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:
```Groovy
-implementation platform('com.google.cloud:libraries-bom:26.8.0')
+implementation platform('com.google.cloud:libraries-bom:26.9.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-bigquerystorage:2.32.0'
+implementation 'com.google.cloud:google-cloud-bigquerystorage:2.32.1'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.32.0"
+libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.32.1"
```
## Authentication
diff --git a/google-cloud-bigquerystorage-bom/pom.xml b/google-cloud-bigquerystorage-bom/pom.xml
index d61f682280..1119e01e9d 100644
--- a/google-cloud-bigquerystorage-bom/pom.xml
+++ b/google-cloud-bigquerystorage-bom/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.google.cloud
google-cloud-bigquerystorage-bom
- 2.32.1
+ 2.33.0
pom
com.google.cloud
@@ -52,37 +52,37 @@
com.google.cloud
google-cloud-bigquerystorage
- 2.32.1
+ 2.33.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta1
- 0.156.1
+ 0.157.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta2
- 0.156.1
+ 0.157.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1
- 2.32.1
+ 2.33.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta1
- 0.156.1
+ 0.157.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta2
- 0.156.1
+ 0.157.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1
- 2.32.1
+ 2.33.0
diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml
index 5914aa2586..a09d94c1a0 100644
--- a/google-cloud-bigquerystorage/pom.xml
+++ b/google-cloud-bigquerystorage/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.google.cloud
google-cloud-bigquerystorage
- 2.32.1
+ 2.33.0
jar
BigQuery Storage
https://github.com/googleapis/java-bigquerystorage
@@ -11,7 +11,7 @@
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.32.1
+ 2.33.0
google-cloud-bigquerystorage
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
index 8eab8bfdde..6da6950d3a 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java
@@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
@@ -32,6 +33,7 @@
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
+import java.time.Instant;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
@@ -65,6 +67,14 @@ class ConnectionWorker implements AutoCloseable {
// Maximum wait time on inflight quota before error out.
private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000;
+ /*
+ * Maximum time waiting for request callback before shutting down the connection.
+ *
+ * We will constantly checking how much time we have been waiting for the next request callback
+ * if we wait too much time we will start shutting down the connections and clean up the queues.
+ */
+ private static Duration MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = Duration.ofMinutes(15);
+
private Lock lock;
private Condition hasMessageInWaitingQueue;
private Condition inflightReduced;
@@ -77,6 +87,11 @@ class ConnectionWorker implements AutoCloseable {
*/
private String streamName;
+ /*
+ * The location of this connection.
+ */
+ private String location = null;
+
/*
* The proto schema of rows to write. This schema can change during multiplexing.
*/
@@ -198,6 +213,12 @@ class ConnectionWorker implements AutoCloseable {
*/
private final String writerId = UUID.randomUUID().toString();
+ /*
+ * Test only exception behavior testing params.
+ */
+ private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null;
+ private long testOnlyAppendLoopSleepTime = 0;
+
/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
@@ -205,6 +226,7 @@ public static long getApiMaxRequestBytes() {
public ConnectionWorker(
String streamName,
+ String location,
ProtoSchema writerSchema,
long maxInflightRequests,
long maxInflightBytes,
@@ -217,6 +239,9 @@ public ConnectionWorker(
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
this.streamName = streamName;
+ if (location != null && !location.isEmpty()) {
+ this.location = location;
+ }
this.maxRetryDuration = maxRetryDuration;
if (writerSchema == null) {
throw new StatusRuntimeException(
@@ -230,6 +255,16 @@ public ConnectionWorker(
this.waitingRequestQueue = new LinkedList();
this.inflightRequestQueue = new LinkedList();
// Always recreate a client for connection worker.
+ HashMap newHeaders = new HashMap<>();
+ newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
+ if (this.location == null) {
+ newHeaders.put("x-goog-request-params", "write_stream=" + this.streamName);
+ }
+ BigQueryWriteSettings stubSettings =
+ clientSettings
+ .toBuilder()
+ .setHeaderProvider(FixedHeaderProvider.create(newHeaders))
+ .build();
this.client = BigQueryWriteClient.create(clientSettings);
this.appendThread =
@@ -240,6 +275,24 @@ public void run() {
appendLoop();
}
});
+ appendThread.setUncaughtExceptionHandler(
+ (Thread t, Throwable e) -> {
+ log.warning(
+ "Exception thrown from append loop, thus stream writer is shutdown due to exception: "
+ + e.toString());
+ lock.lock();
+ try {
+ connectionFinalStatus = e;
+ // Move all current waiting requests to in flight queue.
+ while (!this.waitingRequestQueue.isEmpty()) {
+ AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
+ this.inflightRequestQueue.addLast(requestWrapper);
+ }
+ } finally {
+ lock.unlock();
+ }
+ cleanupInflightRequests();
+ });
this.appendThread.start();
}
@@ -249,6 +302,8 @@ private void resetConnection() {
// It's safe to directly close the previous connection as the in flight messages
// will be picked up by the next connection.
this.streamConnection.close();
+ Uninterruptibles.sleepUninterruptibly(
+ calculateSleepTimeMilli(conectionRetryCountWithoutCallback), TimeUnit.MILLISECONDS);
}
this.streamConnection =
new StreamConnection(
@@ -270,6 +325,24 @@ public void run(Throwable finalStatus) {
/** Schedules the writing of rows at given offset. */
ApiFuture append(StreamWriter streamWriter, ProtoRows rows, long offset) {
+ if (this.location != null && this.location != streamWriter.getLocation()) {
+ throw new StatusRuntimeException(
+ Status.fromCode(Code.INVALID_ARGUMENT)
+ .withDescription(
+ "StreamWriter with location "
+ + streamWriter.getLocation()
+ + " is scheduled to use a connection with location "
+ + this.location));
+ } else if (this.location == null && streamWriter.getStreamName() != this.streamName) {
+ // Location is null implies this is non-multiplexed connection.
+ throw new StatusRuntimeException(
+ Status.fromCode(Code.INVALID_ARGUMENT)
+ .withDescription(
+ "StreamWriter with stream name "
+ + streamWriter.getStreamName()
+ + " is scheduled to use a connection with stream name "
+ + this.streamName));
+ }
Preconditions.checkNotNull(streamWriter);
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(
@@ -295,6 +368,10 @@ Boolean isUserClosed() {
}
}
+ String getWriteLocation() {
+ return this.location;
+ }
+
private ApiFuture appendInternal(
StreamWriter streamWriter, AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message, streamWriter);
@@ -391,6 +468,22 @@ private void maybeWaitForInflightQuota() {
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
}
+ @VisibleForTesting
+ static long calculateSleepTimeMilli(long retryCount) {
+ return Math.min((long) Math.pow(2, retryCount), 60000);
+ }
+
+ @VisibleForTesting
+ void setTestOnlyAppendLoopSleepTime(long testOnlyAppendLoopSleepTime) {
+ this.testOnlyAppendLoopSleepTime = testOnlyAppendLoopSleepTime;
+ }
+
+ @VisibleForTesting
+ void setTestOnlyRunTimeExceptionInAppendLoop(
+ RuntimeException testOnlyRunTimeExceptionInAppendLoop) {
+ this.testOnlyRunTimeExceptionInAppendLoop = testOnlyRunTimeExceptionInAppendLoop;
+ }
+
public long getInflightWaitSeconds() {
return inflightWaitSec.longValue();
}
@@ -420,7 +513,7 @@ public void close() {
} finally {
this.lock.unlock();
}
- log.fine("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
+ log.info("Waiting for append thread to finish. Stream: " + streamName + " id: " + writerId);
try {
appendThread.join();
} catch (InterruptedException e) {
@@ -438,6 +531,7 @@ public void close() {
// Backend request has a 2 minute timeout, so wait a little longer than that.
this.client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
+ log.warning("Client await termination timeout in writer id " + writerId);
}
try {
@@ -482,6 +576,11 @@ private void appendLoop() {
this.lock.lock();
try {
hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
+ // Check whether we should error out the current append loop.
+ if (inflightRequestQueue.size() > 0) {
+ throwIfWaitCallbackTooLong(inflightRequestQueue.getFirst().requestCreationTimeStamp);
+ }
+
// Copy the streamConnectionIsConnected guarded by lock to a local variable.
// In addition, only reconnect if there is a retriable error.
streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null;
@@ -496,6 +595,7 @@ private void appendLoop() {
}
while (!this.waitingRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
+ requestWrapper.trySetRequestInsertQueueTime();
this.inflightRequestQueue.addLast(requestWrapper);
localQueue.addLast(requestWrapper);
}
@@ -524,6 +624,10 @@ private void appendLoop() {
} finally {
lock.unlock();
}
+ if (testOnlyRunTimeExceptionInAppendLoop != null) {
+ Uninterruptibles.sleepUninterruptibly(testOnlyAppendLoopSleepTime, TimeUnit.MILLISECONDS);
+ throw testOnlyRunTimeExceptionInAppendLoop;
+ }
resetConnection();
// Set firstRequestInConnection to indicate the next request to be sent should include
// metedata. Reset everytime after reconnection.
@@ -612,6 +716,17 @@ private void appendLoop() {
log.info("Append thread is done. Stream: " + streamName + " id: " + writerId);
}
+ private void throwIfWaitCallbackTooLong(Instant timeToCheck) {
+ Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now());
+ if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
+ throw new RuntimeException(
+ String.format(
+ "Request has waited in inflight queue for %sms for writer %s, "
+ + "which is over maximum wait time %s",
+ milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME.toString()));
+ }
+ }
+
/*
* Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue.
*
@@ -649,6 +764,7 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
}
this.lock.lock();
try {
+ log.warning("Donecallback is not triggered within timeout frame for writer " + writerId);
if (connectionFinalStatus == null) {
connectionFinalStatus =
new StatusRuntimeException(
@@ -792,7 +908,7 @@ private boolean isConnectionErrorRetriable(Throwable t) {
}
private void doneCallback(Throwable finalStatus) {
- log.fine(
+ log.info(
"Received done callback. Stream: "
+ streamName
+ " worker id: "
@@ -832,7 +948,9 @@ private void doneCallback(Throwable finalStatus) {
"Connection finished with error "
+ finalStatus.toString()
+ " for stream "
- + streamName);
+ + streamName
+ + " with write id: "
+ + writerId);
}
}
} finally {
@@ -864,12 +982,21 @@ static final class AppendRequestAndResponse {
// The writer that issues the call of the request.
final StreamWriter streamWriter;
+ Instant requestCreationTimeStamp;
+
AppendRequestAndResponse(AppendRowsRequest message, StreamWriter streamWriter) {
this.appendResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getProtoRows().getSerializedSize();
this.streamWriter = streamWriter;
}
+
+ void trySetRequestInsertQueueTime() {
+ // Only set the first time the caller tries to set the timestamp.
+ if (requestCreationTimeStamp == null) {
+ requestCreationTimeStamp = Instant.now();
+ }
+ }
}
/** Returns the current workload of this worker. */
@@ -960,6 +1087,11 @@ static void setMaxInflightQueueWaitTime(long waitTime) {
INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime;
}
+ @VisibleForTesting
+ static void setMaxInflightRequestWaitTime(Duration waitTime) {
+ MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
+ }
+
@AutoValue
abstract static class TableSchemaAndTimestamp {
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
index 8fcb84165e..83be8ce52a 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java
@@ -288,7 +288,8 @@ private ConnectionWorker createOrReuseConnectionWorker(
String streamReference = streamWriter.getStreamName();
if (connectionWorkerPool.size() < currentMaxConnectionCount) {
// Always create a new connection if we haven't reached current maximum.
- return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
+ return createConnectionWorker(
+ streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema());
} else {
ConnectionWorker existingBestConnection =
pickBestLoadConnection(
@@ -304,7 +305,10 @@ private ConnectionWorker createOrReuseConnectionWorker(
if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) {
currentMaxConnectionCount = settings.maxConnectionsPerRegion();
}
- return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
+ return createConnectionWorker(
+ streamWriter.getStreamName(),
+ streamWriter.getLocation(),
+ streamWriter.getProtoSchema());
} else {
// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
@@ -359,8 +363,8 @@ static ConnectionWorker pickBestLoadConnection(
* a single stream reference. This is because createConnectionWorker(...) is called via
* computeIfAbsent(...) which is at most once per key.
*/
- private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema writeSchema)
- throws IOException {
+ private ConnectionWorker createConnectionWorker(
+ String streamName, String location, ProtoSchema writeSchema) throws IOException {
if (enableTesting) {
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
@@ -368,6 +372,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
+ location,
writeSchema,
maxInflightRequests,
maxInflightBytes,
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
index ff965f0477..b21a52a63d 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
@@ -208,6 +208,7 @@ private StreamWriter(Builder builder) throws IOException {
SingleConnectionOrConnectionPool.ofSingleConnection(
new ConnectionWorker(
builder.streamName,
+ builder.location,
builder.writerSchema,
builder.maxInflightRequest,
builder.maxInflightBytes,
@@ -308,17 +309,37 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}
- private BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
+ static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
+ BigQueryWriteSettings.Builder settingsBuilder = null;
if (builder.client != null) {
- return builder.client.getSettings();
+ settingsBuilder = builder.client.getSettings().toBuilder();
} else {
- return BigQueryWriteSettings.newBuilder()
- .setCredentialsProvider(builder.credentialsProvider)
- .setTransportChannelProvider(builder.channelProvider)
- .setBackgroundExecutorProvider(builder.executorProvider)
- .setEndpoint(builder.endpoint)
- .build();
+ settingsBuilder =
+ new BigQueryWriteSettings.Builder()
+ .setTransportChannelProvider(
+ BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
+ .setChannelsPerCpu(1)
+ .build())
+ .setCredentialsProvider(
+ BigQueryWriteSettings.defaultCredentialsProviderBuilder().build())
+ .setBackgroundExecutorProvider(
+ BigQueryWriteSettings.defaultExecutorProviderBuilder().build())
+ .setEndpoint(BigQueryWriteSettings.getDefaultEndpoint());
}
+ if (builder.channelProvider != null) {
+ settingsBuilder.setTransportChannelProvider(builder.channelProvider);
+ }
+ if (builder.credentialsProvider != null) {
+ settingsBuilder.setCredentialsProvider(builder.credentialsProvider);
+ }
+ if (builder.executorProvider != null) {
+ settingsBuilder.setBackgroundExecutorProvider(builder.executorProvider);
+ }
+ if (builder.endpoint != null) {
+ settingsBuilder.setEndpoint(builder.endpoint);
+ }
+
+ return settingsBuilder.build();
}
// Validate whether the fetched connection pool matched certain properties.
@@ -542,16 +563,13 @@ public static final class Builder {
private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;
- private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
+ private String endpoint = null;
- private TransportChannelProvider channelProvider =
- BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
+ private TransportChannelProvider channelProvider = null;
- private CredentialsProvider credentialsProvider =
- BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
+ private CredentialsProvider credentialsProvider = null;
- private ExecutorProvider executorProvider =
- BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
+ private ExecutorProvider executorProvider = null;
private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;
@@ -633,7 +651,8 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
/** {@code ExecutorProvider} to use to create Executor to run background jobs. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
- this.executorProvider = executorProvider;
+ this.executorProvider =
+ Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
return this;
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
index 980772b2ff..e558d567c8 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java
@@ -430,6 +430,7 @@ private StreamWriter getTestStreamWriter(String streamName) throws IOException {
return StreamWriter.newBuilder(streamName)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
+ .setLocation("us")
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.build();
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
index 4edf0f3e9d..cb4e05ab20 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java
@@ -38,6 +38,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Logger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -45,6 +47,7 @@
@RunWith(JUnit4.class)
public class ConnectionWorkerTest {
+ private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/s1";
private static final String TEST_STREAM_2 = "projects/p2/datasets/d2/tables/t2/streams/s2";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
@@ -58,6 +61,7 @@ public class ConnectionWorkerTest {
public void setUp() throws Exception {
testBigQueryWrite = new FakeBigQueryWrite();
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
+ ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofMinutes(10));
serviceHelper =
new MockServiceHelper(
UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite));
@@ -83,10 +87,12 @@ public void testMultiplexedAppendSuccess() throws Exception {
StreamWriter sw1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema("foo"))
+ .setLocation("us")
.build();
StreamWriter sw2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(createProtoSchema("complicate"))
+ .setLocation("us")
.build();
// We do a pattern of:
// send to stream1, string1
@@ -204,11 +210,20 @@ public void testAppendInSameStream_switchSchema() throws Exception {
// send to stream1, schema1
// ...
StreamWriter sw1 =
- StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
+ StreamWriter.newBuilder(TEST_STREAM_1, client)
+ .setLocation("us")
+ .setWriterSchema(schema1)
+ .build();
StreamWriter sw2 =
- StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema2).build();
+ StreamWriter.newBuilder(TEST_STREAM_1, client)
+ .setLocation("us")
+ .setWriterSchema(schema2)
+ .build();
StreamWriter sw3 =
- StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema3).build();
+ StreamWriter.newBuilder(TEST_STREAM_1, client)
+ .setLocation("us")
+ .setWriterSchema(schema3)
+ .build();
for (long i = 0; i < appendCount; i++) {
switch ((int) i % 4) {
case 0:
@@ -304,10 +319,14 @@ public void testAppendInSameStream_switchSchema() throws Exception {
public void testAppendButInflightQueueFull() throws Exception {
ProtoSchema schema1 = createProtoSchema("foo");
StreamWriter sw1 =
- StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
+ StreamWriter.newBuilder(TEST_STREAM_1, client)
+ .setLocation("us")
+ .setWriterSchema(schema1)
+ .build();
ConnectionWorker connectionWorker =
new ConnectionWorker(
TEST_STREAM_1,
+ "us",
createProtoSchema("foo"),
6,
100000,
@@ -351,6 +370,139 @@ public void testAppendButInflightQueueFull() throws Exception {
}
}
+ @Test
+ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
+ ProtoSchema schema1 = createProtoSchema("foo");
+ StreamWriter sw1 =
+ StreamWriter.newBuilder(TEST_STREAM_1, client)
+ .setLocation("us")
+ .setWriterSchema(schema1)
+ .build();
+ ConnectionWorker connectionWorker =
+ new ConnectionWorker(
+ TEST_STREAM_1,
+ "us",
+ createProtoSchema("foo"),
+ 100000,
+ 100000,
+ Duration.ofSeconds(100),
+ FlowController.LimitExceededBehavior.Block,
+ TEST_TRACE_ID,
+ client.getSettings());
+ testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
+ ConnectionWorker.setMaxInflightQueueWaitTime(500);
+
+ long appendCount = 10;
+ for (int i = 0; i < appendCount; i++) {
+ testBigQueryWrite.addResponse(createAppendResponse(i));
+ }
+ connectionWorker.setTestOnlyRunTimeExceptionInAppendLoop(
+ new RuntimeException("Any exception can happen."));
+ // Sleep 1 second before erroring out.
+ connectionWorker.setTestOnlyAppendLoopSleepTime(1000L);
+
+ // In total insert 5 requests,
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < appendCount; i++) {
+ futures.add(
+ sendTestMessage(
+ connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
+ assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
+ }
+
+ for (int i = 0; i < appendCount; i++) {
+ int finalI = i;
+ ExecutionException ex =
+ assertThrows(
+ ExecutionException.class,
+ () -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
+ assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
+ }
+
+ // The future append will directly fail.
+ ExecutionException ex =
+ assertThrows(
+ ExecutionException.class,
+ () ->
+ sendTestMessage(
+ connectionWorker,
+ sw1,
+ createFooProtoRows(new String[] {String.valueOf(100)}),
+ 100)
+ .get());
+ assertThat(ex.getCause()).hasMessageThat().contains("Any exception can happen.");
+ }
+
+ @Test
+ public void testLocationMismatch() throws Exception {
+ ProtoSchema schema1 = createProtoSchema("foo");
+ StreamWriter sw1 =
+ StreamWriter.newBuilder(TEST_STREAM_1, client)
+ .setWriterSchema(schema1)
+ .setLocation("eu")
+ .build();
+ ConnectionWorker connectionWorker =
+ new ConnectionWorker(
+ TEST_STREAM_1,
+ "us",
+ createProtoSchema("foo"),
+ 100000,
+ 100000,
+ Duration.ofSeconds(100),
+ FlowController.LimitExceededBehavior.Block,
+ TEST_TRACE_ID,
+ client.getSettings());
+ StatusRuntimeException ex =
+ assertThrows(
+ StatusRuntimeException.class,
+ () ->
+ sendTestMessage(
+ connectionWorker,
+ sw1,
+ createFooProtoRows(new String[] {String.valueOf(0)}),
+ 0));
+ assertEquals(
+ "INVALID_ARGUMENT: StreamWriter with location eu is scheduled to use a connection with location us",
+ ex.getMessage());
+ }
+
+ @Test
+ public void testStreamNameMismatch() throws Exception {
+ ProtoSchema schema1 = createProtoSchema("foo");
+ StreamWriter sw1 =
+ StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
+ ConnectionWorker connectionWorker =
+ new ConnectionWorker(
+ TEST_STREAM_2,
+ null,
+ createProtoSchema("foo"),
+ 100000,
+ 100000,
+ Duration.ofSeconds(100),
+ FlowController.LimitExceededBehavior.Block,
+ TEST_TRACE_ID,
+ client.getSettings());
+ StatusRuntimeException ex =
+ assertThrows(
+ StatusRuntimeException.class,
+ () ->
+ sendTestMessage(
+ connectionWorker,
+ sw1,
+ createFooProtoRows(new String[] {String.valueOf(0)}),
+ 0));
+ assertEquals(
+ "INVALID_ARGUMENT: StreamWriter with stream name projects/p1/datasets/d1/tables/t1/streams/s1 is scheduled to use a connection with stream name projects/p2/datasets/d2/tables/t2/streams/s2",
+ ex.getMessage());
+ }
+
+ @Test
+ public void testExponentialBackoff() throws Exception {
+ assertThat(ConnectionWorker.calculateSleepTimeMilli(0)).isEqualTo(1);
+ assertThat(ConnectionWorker.calculateSleepTimeMilli(5)).isEqualTo(32);
+ assertThat(ConnectionWorker.calculateSleepTimeMilli(100)).isEqualTo(60000);
+ }
+
private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
@@ -373,6 +525,7 @@ private ConnectionWorker createConnectionWorker(
throws IOException {
return new ConnectionWorker(
streamName,
+ "us",
createProtoSchema("foo"),
maxRequests,
maxBytes,
@@ -455,4 +608,106 @@ public void testLoadIsOverWhelmed() {
Load load2 = ConnectionWorker.Load.create(1, 1, 100, 100, 100);
assertThat(load2.isOverwhelmed()).isFalse();
}
+
+ @Test
+ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
+ ProtoSchema schema1 = createProtoSchema("foo");
+ ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1));
+ StreamWriter sw1 =
+ StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
+ ConnectionWorker connectionWorker =
+ new ConnectionWorker(
+ TEST_STREAM_1,
+ null,
+ createProtoSchema("foo"),
+ 100000,
+ 100000,
+ Duration.ofSeconds(100),
+ FlowController.LimitExceededBehavior.Block,
+ TEST_TRACE_ID,
+ client.getSettings());
+ testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));
+
+ long appendCount = 10;
+ for (int i = 0; i < appendCount; i++) {
+ testBigQueryWrite.addResponse(createAppendResponse(i));
+ }
+
+ // In total insert 5 requests,
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < appendCount; i++) {
+ futures.add(
+ sendTestMessage(
+ connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
+ assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), i + 1);
+ }
+
+ for (int i = 0; i < appendCount; i++) {
+ int finalI = i;
+ ExecutionException ex =
+ assertThrows(
+ ExecutionException.class,
+ () -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
+ assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
+ }
+
+ // The future append will directly fail.
+ ExecutionException ex =
+ assertThrows(
+ ExecutionException.class,
+ () ->
+ sendTestMessage(
+ connectionWorker,
+ sw1,
+ createFooProtoRows(new String[] {String.valueOf(100)}),
+ 100)
+ .get());
+ assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
+ }
+
+ @Test
+ public void testLongTimeIdleWontFail() throws Exception {
+ ProtoSchema schema1 = createProtoSchema("foo");
+ ConnectionWorker.setMaxInflightRequestWaitTime(Duration.ofSeconds(1));
+ StreamWriter sw1 =
+ StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
+ ConnectionWorker connectionWorker =
+ new ConnectionWorker(
+ TEST_STREAM_1,
+ null,
+ createProtoSchema("foo"),
+ 100000,
+ 100000,
+ Duration.ofSeconds(100),
+ FlowController.LimitExceededBehavior.Block,
+ TEST_TRACE_ID,
+ client.getSettings());
+
+ long appendCount = 10;
+ for (int i = 0; i < appendCount * 2; i++) {
+ testBigQueryWrite.addResponse(createAppendResponse(i));
+ }
+
+ // In total insert 5 requests,
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < appendCount; i++) {
+ futures.add(
+ sendTestMessage(
+ connectionWorker, sw1, createFooProtoRows(new String[] {String.valueOf(i)}), i));
+ }
+ // Sleep 2 seconds to make sure request queue is empty.
+ Thread.sleep(2000);
+ assertEquals(connectionWorker.getLoad().inFlightRequestsCount(), 0);
+ for (int i = 0; i < appendCount; i++) {
+ futures.add(
+ sendTestMessage(
+ connectionWorker,
+ sw1,
+ createFooProtoRows(new String[] {String.valueOf(i)}),
+ i + appendCount));
+ }
+ for (int i = 0; i < appendCount * 2; i++) {
+ assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
+ }
+ }
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
index 43c5fd2bea..af36273102 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
@@ -25,7 +25,10 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowController;
+import com.google.api.gax.core.GoogleCredentialsProvider;
+import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.AbortedException;
@@ -1366,4 +1369,79 @@ public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception {
assertTrue(ex.getCause() instanceof InvalidArgumentException);
assertFalse(writer.isUserClosed());
}
+
+ @Test(timeout = 10000)
+ public void testBuilderDefaultSetting() throws Exception {
+ StreamWriter.Builder writerBuilder = StreamWriter.newBuilder(TEST_STREAM_1);
+ BigQueryWriteSettings writeSettings = StreamWriter.getBigQueryWriteSettings(writerBuilder);
+ assertEquals(
+ BigQueryWriteSettings.defaultExecutorProviderBuilder().build().toString(),
+ writeSettings.getBackgroundExecutorProvider().toString());
+ assertEquals(
+ BigQueryWriteSettings.defaultCredentialsProviderBuilder().build().toString(),
+ writeSettings.getCredentialsProvider().toString());
+ assertTrue(
+ writeSettings.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider);
+ assertEquals(
+ BigQueryWriteSettings.getDefaultEndpoint(), writeSettings.getEndpoint().toString());
+ }
+
+ @Test(timeout = 10000)
+ public void testBuilderExplicitSetting() throws Exception {
+ // Client has special seetings.
+ BigQueryWriteSettings clientSettings =
+ BigQueryWriteSettings.newBuilder()
+ .setEndpoint("xxx:345")
+ .setBackgroundExecutorProvider(
+ InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build())
+ .setTransportChannelProvider(serviceHelper.createChannelProvider())
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .build();
+ BigQueryWriteClient client = BigQueryWriteClient.create(clientSettings);
+ StreamWriter.Builder writerWithClient = StreamWriter.newBuilder(TEST_STREAM_1, client);
+ BigQueryWriteSettings writerSettings = StreamWriter.getBigQueryWriteSettings(writerWithClient);
+ assertEquals("xxx:345", writerSettings.getEndpoint());
+ assertTrue(
+ writerSettings.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider);
+ assertEquals(
+ 4,
+ ((InstantiatingExecutorProvider) writerSettings.getBackgroundExecutorProvider())
+ .getExecutorThreadCount());
+
+ // Explicit setting on StreamWriter is respected.
+ StreamWriter.Builder writerWithClientWithOverrides =
+ StreamWriter.newBuilder(TEST_STREAM_1, client)
+ .setEndpoint("yyy:345")
+ .setExecutorProvider(
+ InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build())
+ .setChannelProvider(
+ BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
+ .setKeepAliveTimeout(Duration.ofSeconds(500))
+ .build())
+ .setCredentialsProvider(
+ BigQueryWriteSettings.defaultCredentialsProviderBuilder()
+ .setScopesToApply(Arrays.asList("A", "B"))
+ .build());
+ BigQueryWriteSettings writerSettings2 =
+ StreamWriter.getBigQueryWriteSettings(writerWithClientWithOverrides);
+ assertEquals("yyy:345", writerSettings2.getEndpoint());
+ assertTrue(
+ writerSettings2.getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider);
+ assertEquals(
+ 14,
+ ((InstantiatingExecutorProvider) writerSettings2.getBackgroundExecutorProvider())
+ .getExecutorThreadCount());
+ assertTrue(
+ writerSettings2.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider);
+ assertEquals(
+ Duration.ofSeconds(500),
+ ((InstantiatingGrpcChannelProvider) writerSettings2.getTransportChannelProvider())
+ .getKeepAliveTimeout());
+ assertTrue(writerSettings2.getCredentialsProvider() instanceof GoogleCredentialsProvider);
+ assertEquals(
+ 2,
+ ((GoogleCredentialsProvider) writerSettings2.getCredentialsProvider())
+ .getScopesToApply()
+ .size());
+ }
}
diff --git a/grpc-google-cloud-bigquerystorage-v1/pom.xml b/grpc-google-cloud-bigquerystorage-v1/pom.xml
index 2fcca3c433..57d3ac1474 100644
--- a/grpc-google-cloud-bigquerystorage-v1/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1
- 2.32.1
+ 2.33.0
grpc-google-cloud-bigquerystorage-v1
GRPC library for grpc-google-cloud-bigquerystorage-v1
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.32.1
+ 2.33.0
diff --git a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
index dad213bb44..0197ad5490 100644
--- a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta1
- 0.156.1
+ 0.157.0
grpc-google-cloud-bigquerystorage-v1beta1
GRPC library for grpc-google-cloud-bigquerystorage-v1beta1
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.32.1
+ 2.33.0
diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
index 7ca8666dd3..e8d9f1075e 100644
--- a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta2
- 0.156.1
+ 0.157.0
grpc-google-cloud-bigquerystorage-v1beta2
GRPC library for grpc-google-cloud-bigquerystorage-v1beta2
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.32.1
+ 2.33.0
diff --git a/pom.xml b/pom.xml
index f18f3f3a93..8471a5447d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-bigquerystorage-parent
pom
- 2.32.1
+ 2.33.0
BigQuery Storage Parent
https://github.com/googleapis/java-bigquerystorage
@@ -83,37 +83,37 @@
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta1
- 0.156.1
+ 0.157.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta2
- 0.156.1
+ 0.157.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1
- 2.32.1
+ 2.33.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta1
- 0.156.1
+ 0.157.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta2
- 0.156.1
+ 0.157.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1
- 2.32.1
+ 2.33.0
com.google.cloud
google-cloud-bigquerystorage
- 2.32.1
+ 2.33.0
org.json
diff --git a/proto-google-cloud-bigquerystorage-v1/pom.xml b/proto-google-cloud-bigquerystorage-v1/pom.xml
index b48528a360..fda1d7686b 100644
--- a/proto-google-cloud-bigquerystorage-v1/pom.xml
+++ b/proto-google-cloud-bigquerystorage-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1
- 2.32.1
+ 2.33.0
proto-google-cloud-bigquerystorage-v1
PROTO library for proto-google-cloud-bigquerystorage-v1
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.32.1
+ 2.33.0
diff --git a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml
index ac706707ed..3c5c35f3f0 100644
--- a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml
+++ b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta1
- 0.156.1
+ 0.157.0
proto-google-cloud-bigquerystorage-v1beta1
PROTO library for proto-google-cloud-bigquerystorage-v1beta1
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.32.1
+ 2.33.0
diff --git a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml
index 7251ccf20f..cd7466b7dc 100644
--- a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml
+++ b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta2
- 0.156.1
+ 0.157.0
proto-google-cloud-bigquerystorage-v1beta2
PROTO library for proto-google-cloud-bigquerystorage-v1beta2
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.32.1
+ 2.33.0
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 94fdb0e069..a277903578 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -29,7 +29,7 @@
com.google.cloud
google-cloud-bigquerystorage
- 2.32.1
+ 2.33.0
diff --git a/versions.txt b/versions.txt
index d17ebd8d7e..405c380f50 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,10 +1,10 @@
# Format:
# module:released-version:current-version
-google-cloud-bigquerystorage:2.32.1:2.32.1
-grpc-google-cloud-bigquerystorage-v1beta1:0.156.1:0.156.1
-grpc-google-cloud-bigquerystorage-v1beta2:0.156.1:0.156.1
-grpc-google-cloud-bigquerystorage-v1:2.32.1:2.32.1
-proto-google-cloud-bigquerystorage-v1beta1:0.156.1:0.156.1
-proto-google-cloud-bigquerystorage-v1beta2:0.156.1:0.156.1
-proto-google-cloud-bigquerystorage-v1:2.32.1:2.32.1
+google-cloud-bigquerystorage:2.33.0:2.33.0
+grpc-google-cloud-bigquerystorage-v1beta1:0.157.0:0.157.0
+grpc-google-cloud-bigquerystorage-v1beta2:0.157.0:0.157.0
+grpc-google-cloud-bigquerystorage-v1:2.33.0:2.33.0
+proto-google-cloud-bigquerystorage-v1beta1:0.157.0:0.157.0
+proto-google-cloud-bigquerystorage-v1beta2:0.157.0:0.157.0
+proto-google-cloud-bigquerystorage-v1:2.33.0:2.33.0