inflightRequests,
- int batchSizeBytes,
+ long batchSizeBytes,
String streamName,
Boolean attachSchema) {
this.inflightRequests = inflightRequests;
@@ -377,7 +378,7 @@ int count() {
return inflightRequests.size();
}
- int getByteSize() {
+ long getByteSize() {
return this.batchSizeBytes;
}
@@ -478,7 +479,9 @@ public void shutdown() {
currentAlarmFuture.cancel(false);
}
writeAllOutstanding();
- messagesWaiter.waitComplete();
+ synchronized (messagesWaiter) {
+ messagesWaiter.waitComplete();
+ }
if (clientStream.isSendReady()) {
clientStream.closeSend();
}
@@ -496,7 +499,7 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
}
/**
- * Constructs a new {@link Builder} using the given topic.
+ * Constructs a new {@link Builder} using the given stream.
*
* Example of creating a {@code WriteStream}.
*
@@ -514,7 +517,15 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
* }
*/
public static Builder newBuilder(String streamName) {
- return new Builder(streamName);
+ return new Builder(streamName, null);
+ }
+
+ /**
+ * Constructs a new {@link Builder} using the given stream and an existing BigQueryWriteClient.
+ */
+ public static Builder newBuilder(String streamName, BigQueryWriteClient client) {
+ Preconditions.checkArgument(client != null);
+ return new Builder(streamName, client);
}
/** A builder of {@link StreamWriter}s. */
@@ -523,9 +534,6 @@ public static final class Builder {
static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10);
// Meaningful defaults.
- static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L;
- static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 100 * 1024L; // 100 kB
- static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(10);
static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS =
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
@@ -534,9 +542,9 @@ public static final class Builder {
.build();
public static final BatchingSettings DEFAULT_BATCHING_SETTINGS =
BatchingSettings.newBuilder()
- .setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
- .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
- .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
+ .setDelayThreshold(Duration.ofMillis(10))
+ .setRequestByteThreshold(100 * 1024L) // 100 kb
+ .setElementCountThreshold(100L)
.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS)
.build();
public static final RetrySettings DEFAULT_RETRY_SETTINGS =
@@ -555,6 +563,8 @@ public static final class Builder {
private String streamName;
private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
+ private BigQueryWriteClient client = null;
+
// Batching options
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;
@@ -569,8 +579,9 @@ public static final class Builder {
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
- private Builder(String stream) {
+ private Builder(String stream, BigQueryWriteClient client) {
this.streamName = Preconditions.checkNotNull(stream);
+ this.client = client;
}
/**
@@ -771,11 +782,7 @@ public void onResponse(AppendRowsResponse response) {
inflightBatch.onSuccess(response);
}
} finally {
- synchronized (streamWriter.messagesWaiter) {
- streamWriter.messagesWaiter.incrementPendingCount(-1);
- streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
- streamWriter.messagesWaiter.notifyAll();
- }
+ streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
}
@@ -805,11 +812,11 @@ public void onError(Throwable t) {
&& !streamWriter.shutdown.get()) {
streamWriter.refreshAppend();
// Currently there is a bug that it took reconnected stream 5 seconds to pick up
- // stream count. So wait at least 5 seconds before sending a new request.
+ // stream count. So wait at least 7 seconds before sending a new request.
Thread.sleep(
Math.min(
streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(),
- Duration.ofSeconds(5).toMillis()));
+ Duration.ofSeconds(7).toMillis()));
streamWriter.writeBatch(inflightBatch);
synchronized (streamWriter.currentRetries) {
streamWriter.currentRetries++;
@@ -837,11 +844,7 @@ public void onError(Throwable t) {
}
}
} finally {
- synchronized (streamWriter.messagesWaiter) {
- streamWriter.messagesWaiter.incrementPendingCount(-1);
- streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize());
- streamWriter.messagesWaiter.notifyAll();
- }
+ streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
}
}
};
@@ -849,7 +852,7 @@ public void onError(Throwable t) {
// This class controls how many messages are going to be sent out in a batch.
private static class MessagesBatch {
private List messages;
- private int batchedBytes;
+ private long batchedBytes;
private final BatchingSettings batchingSettings;
private Boolean attachSchema = true;
private final String streamName;
@@ -882,7 +885,7 @@ private boolean isEmpty() {
return messages.isEmpty();
}
- private int getBatchedBytes() {
+ private long getBatchedBytes() {
return batchedBytes;
}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java
index 0e15d6c726..43830ae021 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java
@@ -18,9 +18,11 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.FlowControlSettings;
-import com.google.api.gax.grpc.GrpcStatusCode;
-import com.google.api.gax.rpc.UnimplementedException;
-import io.grpc.Status;
+import com.google.api.gax.batching.FlowController;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
/**
@@ -30,105 +32,146 @@
class Waiter {
private static final Logger LOG = Logger.getLogger(Waiter.class.getName());
- private int pendingCount;
- private int pendingSize;
- FlowControlSettings flowControlSettings;
+ private long pendingCount;
+ private long pendingSize;
+ private long countLimit;
+ private long sizeLimit;
+ private FlowController.LimitExceededBehavior behavior;
+ private LinkedList awaitingMessageAcquires;
+ private LinkedList awaitingBytesAcquires;
+ private final Lock lock;
Waiter(FlowControlSettings flowControlSettings) {
pendingCount = 0;
pendingSize = 0;
- this.flowControlSettings = flowControlSettings;
+ this.awaitingMessageAcquires = new LinkedList();
+ this.awaitingBytesAcquires = new LinkedList();
+ this.countLimit = flowControlSettings.getMaxOutstandingElementCount();
+ this.sizeLimit = flowControlSettings.getMaxOutstandingRequestBytes();
+ this.behavior = flowControlSettings.getLimitExceededBehavior();
+ this.lock = new ReentrantLock();
}
- public synchronized void incrementPendingCount(int delta) {
- this.pendingCount += delta;
- if (pendingCount == 0) {
- notifyAll();
+ private void notifyNextAcquires() {
+ if (!awaitingMessageAcquires.isEmpty()) {
+ CountDownLatch awaitingAcquire = awaitingMessageAcquires.getFirst();
+ awaitingAcquire.countDown();
+ }
+ if (!awaitingBytesAcquires.isEmpty()) {
+ CountDownLatch awaitingAcquire = awaitingBytesAcquires.getFirst();
+ awaitingAcquire.countDown();
}
}
- public synchronized void incrementPendingSize(int delta) {
- this.pendingSize += delta;
+ public synchronized void release(long messageSize) {
+ lock.lock();
+ --pendingCount;
+ pendingSize -= messageSize;
+ notifyNextAcquires();
+ lock.unlock();
+ notifyAll();
}
- private void wait(String message) {
- boolean interrupted = false;
+ public void acquire(long messageSize) throws FlowController.FlowControlException {
+ lock.lock();
try {
- LOG.fine("Wait on: " + message);
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
+ if (pendingCount >= countLimit
+ && behavior == FlowController.LimitExceededBehavior.ThrowException) {
+ throw new FlowController.MaxOutstandingElementCountReachedException(countLimit);
+ }
+ if (pendingSize + messageSize >= sizeLimit
+ && behavior == FlowController.LimitExceededBehavior.ThrowException) {
+ throw new FlowController.MaxOutstandingRequestBytesReachedException(sizeLimit);
+ }
- private void handleOverLimit(String message) {
- boolean interrupted = false;
- switch (this.flowControlSettings.getLimitExceededBehavior()) {
- case Block:
- wait(message);
- break;
- case ThrowException:
- throw new IllegalStateException("FlowControl limit exceeded: " + message);
- case Ignore:
- return;
- default:
- throw new UnimplementedException(
- "Unknown behavior setting: "
- + this.flowControlSettings.getLimitExceededBehavior().toString(),
- null,
- GrpcStatusCode.of(Status.Code.UNIMPLEMENTED),
- false);
- }
- }
+ CountDownLatch messageWaiter = null;
+ while (pendingCount >= countLimit) {
+ if (messageWaiter == null) {
+ messageWaiter = new CountDownLatch(1);
+ awaitingMessageAcquires.addLast(messageWaiter);
+ } else {
+ // This message already in line stays at the head of the line.
+ messageWaiter = new CountDownLatch(1);
+ awaitingMessageAcquires.set(0, messageWaiter);
+ }
+ lock.unlock();
+ try {
+ messageWaiter.await();
+ } catch (InterruptedException e) {
+ LOG.warning("Interrupted while waiting to acquire flow control tokens");
+ }
+ lock.lock();
+ }
+ ++pendingCount;
+ if (messageWaiter != null) {
+ awaitingMessageAcquires.removeFirst();
+ }
- public synchronized void waitOnElementCount() {
- LOG.finer(
- "Waiting on element count "
- + this.pendingCount
- + " "
- + this.flowControlSettings.getMaxOutstandingElementCount());
- while (this.pendingCount >= this.flowControlSettings.getMaxOutstandingElementCount()) {
- handleOverLimit("Element count");
- }
- }
+ if (!awaitingMessageAcquires.isEmpty() && pendingCount < countLimit) {
+ awaitingMessageAcquires.getFirst().countDown();
+ }
- public synchronized void waitOnSizeLimit(int incomingSize) {
- LOG.finer(
- "Waiting on size limit "
- + (this.pendingSize + incomingSize)
- + " "
- + this.flowControlSettings.getMaxOutstandingRequestBytes());
- while (this.pendingSize + incomingSize
- >= this.flowControlSettings.getMaxOutstandingRequestBytes()) {
- handleOverLimit("Byte size");
+ // Now acquire space for bytes.
+ CountDownLatch bytesWaiter = null;
+ Long bytesRemaining = messageSize;
+ while (pendingSize + messageSize >= sizeLimit) {
+ if (bytesWaiter == null) {
+ // This message gets added to the back of the line.
+ bytesWaiter = new CountDownLatch(1);
+ awaitingBytesAcquires.addLast(bytesWaiter);
+ } else {
+ // This message already in line stays at the head of the line.
+ bytesWaiter = new CountDownLatch(1);
+ awaitingBytesAcquires.set(0, bytesWaiter);
+ }
+ lock.unlock();
+ try {
+ bytesWaiter.await();
+ } catch (InterruptedException e) {
+ LOG.warning("Interrupted while waiting to acquire flow control tokens");
+ }
+ lock.lock();
+ }
+
+ pendingSize += messageSize;
+ if (bytesWaiter != null) {
+ awaitingBytesAcquires.removeFirst();
+ }
+ // There may be some surplus bytes left; let the next message waiting for bytes have some.
+ if (!awaitingBytesAcquires.isEmpty() && pendingSize < sizeLimit) {
+ awaitingBytesAcquires.getFirst().countDown();
+ }
+ } finally {
+ lock.unlock();
}
}
public synchronized void waitComplete() {
- boolean interrupted = false;
+ lock.lock();
try {
while (pendingCount > 0) {
+ lock.unlock();
try {
wait();
} catch (InterruptedException e) {
- // Ignored, uninterruptibly.
- interrupted = true;
+ LOG.warning("Interrupted while waiting for completion");
}
+ lock.lock();
}
+ } catch (Exception e) {
+ LOG.warning(e.toString());
} finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
+ lock.unlock();
}
}
@InternalApi
- public int pendingCount() {
+ public long pendingCount() {
return pendingCount;
}
@InternalApi
- public int pendingSize() {
+ public long pendingSize() {
return pendingSize;
}
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
index 38394a7479..950419fdc9 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
@@ -425,11 +425,12 @@ public void testFlowControlBehaviorException() throws Exception {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build());
ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
try {
- ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+ appendFuture2.get();
Assert.fail("This should fail");
- } catch (IllegalStateException e) {
- assertEquals("FlowControl limit exceeded: Element count", e.getMessage());
+ } catch (ExecutionException e) {
+ assertEquals("The maximum number of batch elements: 1 have been reached.", e.getMessage());
}
assertEquals(1L, appendFuture1.get().getOffset());
}
@@ -453,7 +454,7 @@ public void testStreamReconnection() throws Exception {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build());
ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"});
assertEquals(false, future1.isDone());
- // Retry is scheduled to be 5 seconds later.
+ // Retry is scheduled to be 7 seconds later.
assertEquals(0L, future1.get().getOffset());
LOG.info("======CASE II");
@@ -469,16 +470,6 @@ public void testStreamReconnection() throws Exception {
}
LOG.info("======CASE III");
- // Writer needs to be recreated since the previous error is not recoverable.
- writer =
- getTestStreamWriterBuilder()
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setDelayThreshold(Duration.ofSeconds(100000))
- .setElementCountThreshold(1L)
- .build())
- .build();
// Case 3: Failed after retried max retry times.
testBigQueryWrite.addException(transientError);
testBigQueryWrite.addException(transientError);
@@ -614,14 +605,9 @@ public void testWriterGetters() throws Exception {
public void testBuilderParametersAndDefaults() {
StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM);
assertEquals(StreamWriter.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
- assertEquals(
- StreamWriter.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD,
- builder.batchingSettings.getRequestByteThreshold().longValue());
- assertEquals(
- StreamWriter.Builder.DEFAULT_DELAY_THRESHOLD, builder.batchingSettings.getDelayThreshold());
- assertEquals(
- StreamWriter.Builder.DEFAULT_ELEMENT_COUNT_THRESHOLD,
- builder.batchingSettings.getElementCountThreshold().longValue());
+ assertEquals(100 * 1024L, builder.batchingSettings.getRequestByteThreshold().longValue());
+ assertEquals(Duration.ofMillis(10), builder.batchingSettings.getDelayThreshold());
+ assertEquals(100L, builder.batchingSettings.getElementCountThreshold().longValue());
assertEquals(StreamWriter.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings);
assertEquals(Duration.ofMillis(100), builder.retrySettings.getInitialRetryDelay());
assertEquals(3, builder.retrySettings.getMaxAttempts());
@@ -814,4 +800,17 @@ public void testClose() throws Exception {
assertEquals("Cannot shut down a writer already shut-down.", e.getMessage());
}
}
+
+ @Test
+ public void testExistingClient() throws Exception {
+ BigQueryWriteSettings settings =
+ BigQueryWriteSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .build();
+ BigQueryWriteClient client = BigQueryWriteClient.create(settings);
+ StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM, client).build();
+ writer.close();
+ assertFalse(client.isShutdown());
+ }
}
diff --git a/grpc-google-cloud-bigquerystorage-v1/pom.xml b/grpc-google-cloud-bigquerystorage-v1/pom.xml
index f9b408e76e..f47279fc0a 100644
--- a/grpc-google-cloud-bigquerystorage-v1/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1
- 0.95.0
+ 0.96.0
grpc-google-cloud-bigquerystorage-v1
GRPC library for grpc-google-cloud-bigquerystorage-v1
com.google.cloud
google-cloud-bigquerystorage-parent
- 0.130.0-beta
+ 0.131.0-beta
diff --git a/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml b/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml
index 32be0f0dc4..6b954591f7 100644
--- a/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1alpha2
- 0.95.0
+ 0.96.0
grpc-google-cloud-bigquerystorage-v1alpha2
GRPC library for grpc-google-cloud-bigquerystorage-v1alpha2
com.google.cloud
google-cloud-bigquerystorage-parent
- 0.130.0-beta
+ 0.131.0-beta
diff --git a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
index 8eed0569a2..efaa6ad929 100644
--- a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta1
- 0.95.0
+ 0.96.0
grpc-google-cloud-bigquerystorage-v1beta1
GRPC library for grpc-google-cloud-bigquerystorage-v1beta1
com.google.cloud
google-cloud-bigquerystorage-parent
- 0.130.0-beta
+ 0.131.0-beta
diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
index 952bd47dab..bcf08340fc 100644
--- a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta2
- 0.95.0
+ 0.96.0
grpc-google-cloud-bigquerystorage-v1beta2
GRPC library for grpc-google-cloud-bigquerystorage-v1beta2
com.google.cloud
google-cloud-bigquerystorage-parent
- 0.130.0-beta
+ 0.131.0-beta
diff --git a/pom.xml b/pom.xml
index f42e7e73f2..c7c0a7c742 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-bigquerystorage-parent
pom
- 0.130.0-beta
+ 0.131.0-beta
BigQuery Storage Parent
https://github.com/googleapis/java-bigquerystorage
@@ -64,7 +64,7 @@
3.11.4
github
google-cloud-bigquerystorage-parent
- 2.10.3
+ 2.11.0
3.5