Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 472a36f

Browse files
egreco12Evan Grecogcf-owl-bot[bot]
authored
feat: Exponentially backoff on INTERNAL errors for Default streams (#2358)
* feat: Exponentially backoff on INTERNAL errors for Default streams * chore: Move testDefaultRequestLimit over to retry file as it uses the same project See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Evan Greco <egreco@google.com> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 714cda9 commit 472a36f

5 files changed

Lines changed: 161 additions & 51 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
5050
If you are using Gradle 5.x or later, add this to your dependencies:
5151

5252
```Groovy
53-
implementation platform('com.google.cloud:libraries-bom:26.27.0')
53+
implementation platform('com.google.cloud:libraries-bom:26.29.0')
5454
5555
implementation 'com.google.cloud:google-cloud-bigquerystorage'
5656
```

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r
970970
lock.lock();
971971
try {
972972
requestWrapper.retryCount++;
973-
if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) {
973+
if (this.retrySettings != null && useBackoffForError(errorCode, streamName)) {
974974
// Trigger exponential backoff in append loop when request is resent for quota errors.
975975
// createNextAttempt correctly initializes the retry delay; createfirstAttempt does not
976976
// include a positive delay, just 0.
@@ -1148,6 +1148,17 @@ private boolean isConnectionErrorRetriable(Code statusCode) {
11481148
|| statusCode == Code.DEADLINE_EXCEEDED;
11491149
}
11501150

1151+
private boolean useBackoffForError(Code statusCode, String streamName) {
1152+
// Default stream uses backoff for INTERNAL, as THROTTLED errors are more likely with default
1153+
// streams. RESOURCE_EXHAUSTED streams are used for backoff for each stream type.
1154+
if (isDefaultStreamName(streamName)) {
1155+
if (statusCode == Code.INTERNAL) {
1156+
return true;
1157+
}
1158+
}
1159+
return statusCode == Code.RESOURCE_EXHAUSTED;
1160+
}
1161+
11511162
private void doneCallback(Throwable finalStatus) {
11521163
log.info(
11531164
"Received done callback. Stream: "

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2047,6 +2047,44 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except
20472047
}
20482048
}
20492049

2050+
@Test
2051+
public void testAppendInternalErrorRetryExponentialBackoff() throws Exception {
2052+
StreamWriter writer = getTestStreamWriterRetryEnabled();
2053+
2054+
testBigQueryWrite.addResponse(
2055+
new DummyResponseSupplierWillFailThenSucceed(
2056+
new FakeBigQueryWriteImpl.Response(createAppendResponse(0)),
2057+
/* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1,
2058+
com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()));
2059+
2060+
ApiFuture<AppendRowsResponse> future =
2061+
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0);
2062+
2063+
ExecutionException ex =
2064+
assertThrows(
2065+
ExecutionException.class,
2066+
() -> {
2067+
future.get();
2068+
});
2069+
assertEquals(
2070+
Status.Code.INTERNAL, ((StatusRuntimeException) ex.getCause()).getStatus().getCode());
2071+
2072+
ArrayList<Instant> instants = testBigQueryWrite.getLatestRequestReceivedInstants();
2073+
Instant previousInstant = instants.get(0);
2074+
// Include initial attempt
2075+
assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1);
2076+
double minExpectedDelay = INITIAL_RETRY_MILLIS * 0.95;
2077+
for (int i = 1; i < instants.size(); i++) {
2078+
Instant currentInstant = instants.get(i);
2079+
double differenceInMillis =
2080+
java.time.Duration.between(previousInstant, currentInstant).toMillis();
2081+
assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS);
2082+
assertThat(differenceInMillis).isGreaterThan(minExpectedDelay);
2083+
minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER;
2084+
previousInstant = currentInstant;
2085+
}
2086+
}
2087+
20502088
@Test
20512089
public void testAppendSuccessAndNonRetryableError() throws Exception {
20522090
StreamWriter writer = getTestStreamWriterRetryEnabled();

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,53 +1608,4 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi
16081608
assertEquals("50", queryIter.next().get(0).getStringValue());
16091609
}
16101610
}
1611-
1612-
@Test
1613-
public void testDefaultRequestLimit()
1614-
throws IOException, InterruptedException, ExecutionException {
1615-
DatasetId datasetId =
1616-
DatasetId.of("bq-write-api-java-retry-test", RemoteBigQueryHelper.generateDatasetName());
1617-
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
1618-
bigquery.create(datasetInfo);
1619-
try {
1620-
String tableName = "no_error_table";
1621-
TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName);
1622-
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
1623-
Schema originalSchema = Schema.of(col1);
1624-
TableInfo tableInfo =
1625-
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
1626-
bigquery.create(tableInfo);
1627-
TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName);
1628-
try (StreamWriter streamWriter =
1629-
StreamWriter.newBuilder(parent.toString() + "/_default")
1630-
.setWriterSchema(CreateProtoSchemaWithColField())
1631-
.build()) {
1632-
ApiFuture<AppendRowsResponse> response =
1633-
streamWriter.append(
1634-
CreateProtoRows(
1635-
new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")}));
1636-
try {
1637-
response.get();
1638-
Assert.fail("Large request should fail with InvalidArgumentError");
1639-
} catch (ExecutionException ex) {
1640-
assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass());
1641-
io.grpc.StatusRuntimeException actualError =
1642-
(io.grpc.StatusRuntimeException) ex.getCause();
1643-
// This verifies that the Beam connector can consume this custom exception's grpc
1644-
// StatusCode
1645-
// TODO(yiru): temp fix to unblock test, while final fix is being rolled out.
1646-
if (actualError.getStatus().getCode() != Code.INTERNAL) {
1647-
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
1648-
assertThat(
1649-
actualError
1650-
.getStatus()
1651-
.getDescription()
1652-
.contains("AppendRows request too large: 19923131 limit 10485760"));
1653-
}
1654-
}
1655-
}
1656-
} finally {
1657-
RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString());
1658-
}
1659-
}
16601611
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,38 @@
1616

1717
package com.google.cloud.bigquery.storage.v1.it;
1818

19+
import static com.google.common.truth.Truth.assertThat;
20+
import static org.junit.Assert.assertEquals;
21+
22+
import com.google.api.core.ApiFuture;
1923
import com.google.cloud.bigquery.BigQuery;
24+
import com.google.cloud.bigquery.DatasetId;
2025
import com.google.cloud.bigquery.DatasetInfo;
2126
import com.google.cloud.bigquery.Field;
2227
import com.google.cloud.bigquery.LegacySQLTypeName;
2328
import com.google.cloud.bigquery.Schema;
29+
import com.google.cloud.bigquery.StandardSQLTypeName;
2430
import com.google.cloud.bigquery.StandardTableDefinition;
2531
import com.google.cloud.bigquery.TableId;
2632
import com.google.cloud.bigquery.TableInfo;
33+
import com.google.cloud.bigquery.storage.test.Test.FooType;
34+
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
2735
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
36+
import com.google.cloud.bigquery.storage.v1.ProtoRows;
37+
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
38+
import com.google.cloud.bigquery.storage.v1.StreamWriter;
39+
import com.google.cloud.bigquery.storage.v1.TableName;
2840
import com.google.cloud.bigquery.storage.v1.WriteStream;
2941
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
42+
import com.google.protobuf.DescriptorProtos.DescriptorProto;
43+
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
3044
import com.google.protobuf.Descriptors.DescriptorValidationException;
45+
import io.grpc.Status.Code;
3146
import java.io.IOException;
47+
import java.util.concurrent.ExecutionException;
3248
import java.util.logging.Logger;
3349
import org.junit.AfterClass;
50+
import org.junit.Assert;
3451
import org.junit.BeforeClass;
3552
import org.junit.Test;
3653

@@ -80,6 +97,15 @@ public static void afterClass() {
8097
}
8198
}
8299

100+
ProtoRows CreateProtoRows(String[] messages) {
101+
ProtoRows.Builder rows = ProtoRows.newBuilder();
102+
for (String message : messages) {
103+
FooType foo = FooType.newBuilder().setFoo(message).build();
104+
rows.addSerializedRows(foo.toByteString());
105+
}
106+
return rows.build();
107+
}
108+
83109
@Test
84110
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
85111
throws IOException, InterruptedException, DescriptorValidationException {
@@ -104,4 +130,88 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
104130
/* requestCount=*/ 901,
105131
/* rowBatchSize=*/ 1);
106132
}
133+
134+
// Moved to ITBigQueryWriteNonQuotaRetryTest from ITBigQueryWriteManualClientTest, as it requires
135+
// usage of the project this file uses to inject errors (bq-write-api-java-retry-test).
136+
@Test
137+
public void testDefaultRequestLimit()
138+
throws IOException, InterruptedException, ExecutionException {
139+
DatasetId datasetId =
140+
DatasetId.of(NON_QUOTA_RETRY_PROJECT_ID, RemoteBigQueryHelper.generateDatasetName());
141+
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
142+
bigquery.create(datasetInfo);
143+
try {
144+
String tableName = "no_error_table";
145+
TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName);
146+
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
147+
Schema originalSchema = Schema.of(col1);
148+
TableInfo tableInfo =
149+
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
150+
bigquery.create(tableInfo);
151+
ProtoSchema schema =
152+
ProtoSchema.newBuilder()
153+
.setProtoDescriptor(
154+
DescriptorProto.newBuilder()
155+
.setName("testProto")
156+
.addField(
157+
FieldDescriptorProto.newBuilder()
158+
.setName("col1")
159+
.setNumber(1)
160+
.setType(FieldDescriptorProto.Type.TYPE_STRING)
161+
.build())
162+
.build())
163+
.build();
164+
TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName);
165+
try (StreamWriter streamWriter =
166+
StreamWriter.newBuilder(parent.toString() + "/_default")
167+
.setWriterSchema(schema)
168+
.build()) {
169+
ApiFuture<AppendRowsResponse> response =
170+
streamWriter.append(
171+
CreateProtoRows(
172+
new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")}));
173+
try {
174+
AppendRowsResponse resp = response.get();
175+
LOG.info(
176+
"Message succeded. Dataset info: "
177+
+ datasetInfo.toString()
178+
+ " tableinfo: "
179+
+ tableInfo.toString()
180+
+ " parent: "
181+
+ parent
182+
+ "streamWriter: "
183+
+ streamWriter.toString()
184+
+ "response: "
185+
+ resp);
186+
Assert.fail("Large request should fail with InvalidArgumentError");
187+
} catch (ExecutionException ex) {
188+
LOG.info(
189+
"Message failed. Dataset info: "
190+
+ datasetInfo.toString()
191+
+ " tableinfo: "
192+
+ tableInfo.toString()
193+
+ " parent: "
194+
+ parent
195+
+ "streamWriter: "
196+
+ streamWriter);
197+
assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass());
198+
io.grpc.StatusRuntimeException actualError =
199+
(io.grpc.StatusRuntimeException) ex.getCause();
200+
// This verifies that the Beam connector can consume this custom exception's grpc
201+
// StatusCode
202+
// TODO(yiru): temp fix to unblock test, while final fix is being rolled out.
203+
if (actualError.getStatus().getCode() != Code.INTERNAL) {
204+
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
205+
assertThat(
206+
actualError
207+
.getStatus()
208+
.getDescription()
209+
.contains("AppendRows request too large: 19923131 limit 10485760"));
210+
}
211+
}
212+
}
213+
} finally {
214+
RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString());
215+
}
216+
}
107217
}

0 commit comments

Comments
 (0)