Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit f8f9770

Browse files
authored
fix: StreamWriter hang when we reach the inflight limit control and is doing a retry (#799)
* fix:a hang in StreamWriter when inflight request reached a limit and we try to resend a message * . * . * . * . * .
1 parent 8c4cec8 commit f8f9770

3 files changed

Lines changed: 54 additions & 40 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,14 @@ private StreamWriter(Builder builder)
187187
this.onSchemaUpdateRunnable.setStreamWriter(this);
188188
}
189189

190-
refreshAppend();
190+
bidiStreamingCallable = stub.appendRowsCallable();
191+
clientStream = bidiStreamingCallable.splitCall(responseObserver);
192+
try {
193+
while (!clientStream.isSendReady()) {
194+
Thread.sleep(10);
195+
}
196+
} catch (InterruptedException e) {
197+
}
191198
}
192199

193200
/** Stream name we are writing to. */
@@ -296,9 +303,9 @@ public void flushAll(long timeoutMillis) throws Exception {
296303
/**
297304
* Re-establishes a stream connection.
298305
*
299-
* @throws IOException
306+
* @throws InterruptedException
300307
*/
301-
public void refreshAppend() throws IOException, InterruptedException {
308+
public void refreshAppend() throws InterruptedException {
302309
appendAndRefreshAppendLock.lock();
303310
if (shutdown.get()) {
304311
LOG.warning("Cannot refresh on a already shutdown writer.");
@@ -313,11 +320,8 @@ public void refreshAppend() throws IOException, InterruptedException {
313320
messagesBatch.resetAttachSchema();
314321
bidiStreamingCallable = stub.appendRowsCallable();
315322
clientStream = bidiStreamingCallable.splitCall(responseObserver);
316-
try {
317-
while (!clientStream.isSendReady()) {
318-
Thread.sleep(10);
319-
}
320-
} catch (InterruptedException expected) {
323+
while (!clientStream.isSendReady()) {
324+
Thread.sleep(10);
321325
}
322326
Thread.sleep(this.retrySettings.getInitialRetryDelay().toMillis());
323327
// Can only unlock here since need to sleep the full 7 seconds before stream can allow appends.
@@ -922,41 +926,43 @@ public void onError(Throwable t) {
922926
}
923927
inflightBatch = this.inflightBatches.poll();
924928
}
925-
try {
926-
if (isRecoverableError(t)) {
927-
try {
928-
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
929-
&& !streamWriter.shutdown.get()) {
930-
streamWriter.refreshAppend();
931-
LOG.info("Resending requests on transient error:" + streamWriter.currentRetries);
932-
streamWriter.writeBatch(inflightBatch);
933-
synchronized (streamWriter.currentRetries) {
934-
streamWriter.currentRetries++;
935-
}
936-
} else {
937-
inflightBatch.onFailure(t);
938-
abortInflightRequests(t);
939-
synchronized (streamWriter.currentRetries) {
940-
streamWriter.currentRetries = 0;
941-
}
929+
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
930+
if (isRecoverableError(t)) {
931+
try {
932+
if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts()
933+
&& !streamWriter.shutdown.get()) {
934+
synchronized (streamWriter.currentRetries) {
935+
streamWriter.currentRetries++;
942936
}
943-
} catch (IOException | InterruptedException e) {
944-
LOG.info("Got exception while retrying.");
945-
inflightBatch.onFailure(e);
946-
abortInflightRequests(e);
937+
LOG.info(
938+
"Try to reestablish connection due to transient error: "
939+
+ t.toString()
940+
+ " retry times: "
941+
+ streamWriter.currentRetries);
942+
streamWriter.refreshAppend();
943+
LOG.info("Resending requests on after connection established");
944+
streamWriter.writeBatch(inflightBatch);
945+
} else {
946+
inflightBatch.onFailure(t);
947+
abortInflightRequests(t);
947948
synchronized (streamWriter.currentRetries) {
948949
streamWriter.currentRetries = 0;
949950
}
950951
}
951-
} else {
952-
inflightBatch.onFailure(t);
953-
abortInflightRequests(t);
952+
} catch (InterruptedException e) {
953+
LOG.info("Got exception while retrying: " + e.toString());
954+
inflightBatch.onFailure(new StatusRuntimeException(Status.ABORTED));
955+
abortInflightRequests(new StatusRuntimeException(Status.ABORTED));
954956
synchronized (streamWriter.currentRetries) {
955957
streamWriter.currentRetries = 0;
956958
}
957959
}
958-
} finally {
959-
streamWriter.messagesWaiter.release(inflightBatch.getByteSize());
960+
} else {
961+
inflightBatch.onFailure(t);
962+
abortInflightRequests(t);
963+
synchronized (streamWriter.currentRetries) {
964+
streamWriter.currentRetries = 0;
965+
}
960966
}
961967
}
962968
};

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,7 +855,6 @@ public void testFlushAll() throws Exception {
855855
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
856856
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});
857857

858-
assertFalse(appendFuture3.isDone());
859858
writer.flushAll(100000);
860859

861860
assertTrue(appendFuture3.isDone());

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -519,21 +519,30 @@ public void testStreamReconnectionTransient() throws Exception {
519519
.toBuilder()
520520
.setDelayThreshold(Duration.ofSeconds(100000))
521521
.setElementCountThreshold(1L)
522+
.setFlowControlSettings(
523+
StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
524+
.toBuilder()
525+
.setMaxOutstandingElementCount(1L)
526+
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
527+
.build())
522528
.build())
523529
.build();
524530

525-
StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE);
526-
testBigQueryWrite.addException(transientError);
527531
testBigQueryWrite.addResponse(
528532
AppendRowsResponse.newBuilder()
529533
.setAppendResult(
530534
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
531535
.build());
536+
testBigQueryWrite.addException(new StatusRuntimeException(Status.UNAVAILABLE));
537+
testBigQueryWrite.addResponse(
538+
AppendRowsResponse.newBuilder()
539+
.setAppendResult(
540+
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
541+
.build());
532542
ApiFuture<AppendRowsResponse> future1 = sendTestMessage(writer, new String[] {"m1"});
533-
assertEquals(false, future1.isDone());
534-
// Retry is scheduled to be 7 seconds later.
543+
ApiFuture<AppendRowsResponse> future2 = sendTestMessage(writer, new String[] {"m1"});
535544
assertEquals(0L, future1.get().getAppendResult().getOffset().getValue());
536-
future1.get();
545+
assertEquals(1L, future2.get().getAppendResult().getOffset().getValue());
537546
writer.close();
538547
}
539548

0 commit comments

Comments
 (0)