|
28 | 28 | import com.google.cloud.bigquery.Schema; |
29 | 29 | import com.google.cloud.bigquery.storage.test.Test.*; |
30 | 30 | import com.google.cloud.bigquery.storage.v1.*; |
| 31 | +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; |
31 | 32 | import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; |
32 | 33 | import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; |
33 | 34 | import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException; |
@@ -313,6 +314,73 @@ public void testJsonStreamWriterCommittedStream() |
313 | 314 | } |
314 | 315 | } |
315 | 316 |
|
| 317 | + @Test |
| 318 | + public void testRowErrors() |
| 319 | + throws IOException, InterruptedException, ExecutionException, |
| 320 | + Descriptors.DescriptorValidationException { |
| 321 | + String tableName = "_default"; |
| 322 | + TableInfo tableInfo = |
| 323 | + TableInfo.newBuilder( |
| 324 | + TableId.of(DATASET, tableName), |
| 325 | + StandardTableDefinition.of( |
| 326 | + Schema.of( |
| 327 | + com.google.cloud.bigquery.Field.newBuilder( |
| 328 | + "foo", StandardSQLTypeName.STRING) |
| 329 | + .setMaxLength(10L) |
| 330 | + .build()))) |
| 331 | + .build(); |
| 332 | + bigquery.create(tableInfo); |
| 333 | + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); |
| 334 | + WriteStream writeStream = |
| 335 | + client.createWriteStream( |
| 336 | + CreateWriteStreamRequest.newBuilder() |
| 337 | + .setParent(parent.toString()) |
| 338 | + .setWriteStream( |
| 339 | + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) |
| 340 | + .build()); |
| 341 | + StreamWriter streamWriter = |
| 342 | + StreamWriter.newBuilder(writeStream.getName()) |
| 343 | + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) |
| 344 | + .build(); |
| 345 | + LOG.info("Sending three messages"); |
| 346 | + ApiFuture<AppendRowsResponse> futureResponse = |
| 347 | + streamWriter.append( |
| 348 | + CreateProtoRows(new String[] {"aaabbbcccddd", "bbb", "cccdddeeefffggg"}), -1); |
| 349 | + AppendRowsResponse actualResponse = null; |
| 350 | + try { |
| 351 | + actualResponse = futureResponse.get(); |
| 352 | + } catch (Throwable t) { |
| 353 | + assertTrue(t instanceof ExecutionException); |
| 354 | + t = t.getCause(); |
| 355 | + assertTrue(t instanceof AppendSerializtionError); |
| 356 | + AppendSerializtionError e = (AppendSerializtionError) t; |
| 357 | + LOG.info("Found row errors on stream: " + e.getStreamName()); |
| 358 | + assertEquals( |
| 359 | + "Field foo: STRING(10) has maximum length 10 but got a value with length 12 on field foo.", |
| 360 | + e.getRowIndexToErrorMessage().get(0)); |
| 361 | + assertEquals( |
| 362 | + "Field foo: STRING(10) has maximum length 10 but got a value with length 15 on field foo.", |
| 363 | + e.getRowIndexToErrorMessage().get(2)); |
| 364 | + for (Map.Entry<Integer, String> entry : e.getRowIndexToErrorMessage().entrySet()) { |
| 365 | + LOG.info("Bad row index: " + entry.getKey() + ", has problem: " + entry.getValue()); |
| 366 | + } |
| 367 | + } |
| 368 | + assertEquals(null, actualResponse); |
| 369 | + LOG.info("Resending with three good messages"); |
| 370 | + ApiFuture<AppendRowsResponse> futureResponse1 = |
| 371 | + streamWriter.append(CreateProtoRows(new String[] {"aaa", "bbb", "ccc"}), -1); |
| 372 | + assertEquals(3, futureResponse1.get().getAppendResult().getOffset().getValue()); |
| 373 | + |
| 374 | + TableResult result = |
| 375 | + bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); |
| 376 | + Iterator<FieldValueList> iter = result.getValues().iterator(); |
| 377 | + FieldValueList currentRow = iter.next(); |
| 378 | + assertEquals("aaa", currentRow.get(0).getStringValue()); |
| 379 | + assertEquals("bbb", iter.next().get(0).getStringValue()); |
| 380 | + assertEquals("ccc", iter.next().get(0).getStringValue()); |
| 381 | + assertEquals(false, iter.hasNext()); |
| 382 | + } |
| 383 | + |
316 | 384 | @Test |
317 | 385 | public void testJsonStreamWriterWithDefaultSchema() |
318 | 386 | throws IOException, InterruptedException, ExecutionException, |
|
0 commit comments