2020import com .google .api .gax .core .CredentialsProvider ;
2121import com .google .api .gax .rpc .FixedHeaderProvider ;
2222import com .google .api .gax .rpc .TransportChannelProvider ;
23+ import com .google .cloud .bigquery .storage .util .Errors ;
2324import com .google .cloud .bigquery .storage .v1 .AppendRowsRequest .ProtoData ;
2425import com .google .cloud .bigquery .storage .v1 .StreamConnection .DoneCallback ;
2526import com .google .cloud .bigquery .storage .v1 .StreamConnection .RequestCallback ;
@@ -90,6 +91,26 @@ public class StreamWriter implements AutoCloseable {
9091 @ GuardedBy ("lock" )
9192 private long inflightBytes = 0 ;
9293
94+ /*
95+ * Tracks how often the stream was closed due to a retriable error. Streaming will stop when the
96+ * count hits a threshold. Streaming should only be halted, if it isn't possible to establish a
97+ * connection. Keep track of the number of reconnections in succession. This will be reset if
98+ * a row is successfully called back.
99+ */
100+ @ GuardedBy ("lock" )
101+ private long conectionRetryCountWithoutCallback = 0 ;
102+
103+ /*
104+ * If false, streamConnection needs to be reset.
105+ */
106+ @ GuardedBy ("lock" )
107+ private boolean streamConnectionIsConnected = false ;
108+
109+ /*
110+ * Retry threshold, limits how often the connection is retried before processing halts.
111+ */
112+ private static final long RETRY_THRESHOLD = 3 ;
113+
93114 /*
94115 * Indicates whether user has called Close() or not.
95116 */
@@ -173,6 +194,18 @@ private StreamWriter(Builder builder) throws IOException {
173194 this .ownsBigQueryWriteClient = false ;
174195 }
175196
197+ this .appendThread =
198+ new Thread (
199+ new Runnable () {
200+ @ Override
201+ public void run () {
202+ appendLoop ();
203+ }
204+ });
205+ this .appendThread .start ();
206+ }
207+
208+ private void resetConnection () {
176209 this .streamConnection =
177210 new StreamConnection (
178211 this .client ,
@@ -188,15 +221,6 @@ public void run(Throwable finalStatus) {
188221 doneCallback (finalStatus );
189222 }
190223 });
191- this .appendThread =
192- new Thread (
193- new Runnable () {
194- @ Override
195- public void run () {
196- appendLoop ();
197- }
198- });
199- this .appendThread .start ();
200224 }
201225
202226 /**
@@ -331,12 +355,27 @@ public void close() {
331355 * It takes requests from waiting queue and sends them to server.
332356 */
333357 private void appendLoop () {
334- boolean isFirstRequestInConnection = true ;
335358 Deque <AppendRequestAndResponse > localQueue = new LinkedList <AppendRequestAndResponse >();
359+ boolean streamNeedsConnecting = false ;
360+ // Set firstRequestInConnection to true immediately after connecting the steam,
361+ // indicates then next row sent, needs the schema and other metadata.
362+ boolean isFirstRequestInConnection = true ;
336363 while (!waitingQueueDrained ()) {
337364 this .lock .lock ();
338365 try {
339366 hasMessageInWaitingQueue .await (100 , TimeUnit .MILLISECONDS );
367+ // Copy the streamConnectionIsConnected guarded by lock to a local variable.
368+ // In addition, only reconnect if there is a retriable error.
369+ streamNeedsConnecting = !streamConnectionIsConnected && connectionFinalStatus == null ;
370+ if (streamNeedsConnecting ) {
371+ // If the stream connection is broken, any requests on inflightRequestQueue will need
372+ // to be resent, as the new connection has no knowledge of the requests. Copy the requests
373+ // from inflightRequestQueue and prepent them onto the waitinRequestQueue. They need to be
374+ // prepended as they need to be sent before new requests.
375+ while (!inflightRequestQueue .isEmpty ()) {
376+ waitingRequestQueue .addFirst (inflightRequestQueue .pollLast ());
377+ }
378+ }
340379 while (!this .waitingRequestQueue .isEmpty ()) {
341380 AppendRequestAndResponse requestWrapper = this .waitingRequestQueue .pollFirst ();
342381 this .inflightRequestQueue .addLast (requestWrapper );
@@ -355,12 +394,34 @@ private void appendLoop() {
355394 if (localQueue .isEmpty ()) {
356395 continue ;
357396 }
358-
359- // TODO: Add reconnection here.
397+ if (streamNeedsConnecting ) {
398+ // Set streamConnectionIsConnected to true, to indicate the stream has been connected. This
399+ // should happen before the call to resetConnection. As it is unknown when the connection
400+ // could be closed and the doneCallback called, and thus clearing the flag.
401+ lock .lock ();
402+ try {
403+ this .streamConnectionIsConnected = true ;
404+ } finally {
405+ lock .unlock ();
406+ }
407+ resetConnection ();
408+ // Set firstRequestInConnection to indicate the next request to be sent should include
409+ // metedata.
410+ isFirstRequestInConnection = true ;
411+ }
360412 while (!localQueue .isEmpty ()) {
361413 AppendRowsRequest preparedRequest =
362414 prepareRequestBasedOnPosition (
363415 localQueue .pollFirst ().message , isFirstRequestInConnection );
416+ // Send should only throw an exception if there is a problem with the request. The catch
417+ // block will handle this case, and return the exception with the result.
418+ // Otherwise send will return:
419+ // SUCCESS: Message was sent, wait for the callback.
420+ // STREAM_CLOSED: Stream was closed, normally or due to en error
421+ // NOT_ENOUGH_QUOTA: Message wasn't sent due to not enough quota.
422+ // TODO: Handle NOT_ENOUGH_QUOTA.
423+ // In the close case, the request is in the inflight queue, and will either be returned
424+ // to the user with an error, or will be resent.
364425 this .streamConnection .send (preparedRequest );
365426 isFirstRequestInConnection = false ;
366427 }
@@ -369,8 +430,10 @@ private void appendLoop() {
369430 log .fine ("Cleanup starts. Stream: " + streamName );
370431 // At this point, the waiting queue is drained, so no more requests.
371432 // We can close the stream connection and handle the remaining inflight requests.
372- this .streamConnection .close ();
373- waitForDoneCallback ();
433+ if (streamConnection != null ) {
434+ this .streamConnection .close ();
435+ waitForDoneCallback ();
436+ }
374437
375438 // At this point, there cannot be more callback. It is safe to clean up all inflight requests.
376439 log .fine (
@@ -455,6 +518,12 @@ private void requestCallback(AppendRowsResponse response) {
455518 AppendRequestAndResponse requestWrapper ;
456519 this .lock .lock ();
457520 try {
521+ // Had a successful connection with at least one result, reset retries.
522+ // conectionRetryCountWithoutCallback is reset so that only multiple retries, without
523+ // successful records sent, will cause the stream to fail.
524+ if (conectionRetryCountWithoutCallback != 0 ) {
525+ conectionRetryCountWithoutCallback = 0 ;
526+ }
458527 requestWrapper = pollInflightRequestQueue ();
459528 } finally {
460529 this .lock .unlock ();
@@ -476,6 +545,14 @@ private void requestCallback(AppendRowsResponse response) {
476545 }
477546 }
478547
548+ private boolean isRetriableError (Throwable t ) {
549+ Status status = Status .fromThrowable (t );
550+ if (Errors .isRetryableInternalStatus (status )) {
551+ return true ;
552+ }
553+ return status .getCode () == Status .Code .ABORTED || status .getCode () == Status .Code .UNAVAILABLE ;
554+ }
555+
479556 private void doneCallback (Throwable finalStatus ) {
480557 log .fine (
481558 "Received done callback. Stream: "
@@ -484,7 +561,26 @@ private void doneCallback(Throwable finalStatus) {
484561 + finalStatus .toString ());
485562 this .lock .lock ();
486563 try {
487- this .connectionFinalStatus = finalStatus ;
564+ this .streamConnectionIsConnected = false ;
565+ if (connectionFinalStatus == null ) {
566+ // If the error can be retried, don't set it here, let it try to retry later on.
567+ if (isRetriableError (finalStatus )
568+ && conectionRetryCountWithoutCallback < RETRY_THRESHOLD
569+ && !userClosed ) {
570+ this .conectionRetryCountWithoutCallback ++;
571+ log .fine (
572+ "Retriable error "
573+ + finalStatus .toString ()
574+ + " received, retry count "
575+ + conectionRetryCountWithoutCallback
576+ + " for stream "
577+ + streamName );
578+ } else {
579+ this .connectionFinalStatus = finalStatus ;
580+ log .info (
581+ "Stream finished with error " + finalStatus .toString () + " for stream " + streamName );
582+ }
583+ }
488584 } finally {
489585 this .lock .unlock ();
490586 }
0 commit comments