1818
1919// [START bigquerystorage_jsonstreamwriter_committed]
2020import com .google .api .core .ApiFuture ;
21+ import com .google .api .core .ApiFutureCallback ;
22+ import com .google .api .core .ApiFutures ;
2123import com .google .cloud .bigquery .storage .v1 .AppendRowsResponse ;
2224import com .google .cloud .bigquery .storage .v1 .BigQueryWriteClient ;
2325import com .google .cloud .bigquery .storage .v1 .CreateWriteStreamRequest ;
24- import com .google .cloud .bigquery .storage .v1 .FinalizeWriteStreamRequest ;
26+ import com .google .cloud .bigquery .storage .v1 .Exceptions ;
27+ import com .google .cloud .bigquery .storage .v1 .Exceptions .StorageException ;
28+ import com .google .cloud .bigquery .storage .v1 .FinalizeWriteStreamResponse ;
2529import com .google .cloud .bigquery .storage .v1 .JsonStreamWriter ;
2630import com .google .cloud .bigquery .storage .v1 .TableName ;
2731import com .google .cloud .bigquery .storage .v1 .WriteStream ;
32+ import com .google .common .util .concurrent .MoreExecutors ;
2833import com .google .protobuf .Descriptors .DescriptorValidationException ;
2934import java .io .IOException ;
3035import java .util .concurrent .ExecutionException ;
36+ import java .util .concurrent .Phaser ;
37+ import javax .annotation .concurrent .GuardedBy ;
3138import org .json .JSONArray ;
3239import org .json .JSONObject ;
3340
@@ -45,13 +52,60 @@ public static void runWriteCommittedStream()
4552
4653 public static void writeCommittedStream (String projectId , String datasetName , String tableName )
4754 throws DescriptorValidationException , InterruptedException , IOException {
55+ BigQueryWriteClient client = BigQueryWriteClient .create ();
56+ TableName parentTable = TableName .of (projectId , datasetName , tableName );
4857
49- try (BigQueryWriteClient client = BigQueryWriteClient .create ()) {
58+ DataWriter writer = new DataWriter ();
59+ // One time initialization.
60+ writer .initialize (parentTable , client );
61+
62+ try {
63+ // Write two batches of fake data to the stream, each with 10 JSON records. Data may be
64+ // batched up to the maximum request size:
65+ // https://cloud.google.com/bigquery/quotas#write-api-limits
66+ long offset = 0 ;
67+ for (int i = 0 ; i < 2 ; i ++) {
68+ // Create a JSON object that is compatible with the table schema.
69+ JSONArray jsonArr = new JSONArray ();
70+ for (int j = 0 ; j < 10 ; j ++) {
71+ JSONObject record = new JSONObject ();
72+ record .put ("col1" , String .format ("batch-record %03d-%03d" , i , j ));
73+ jsonArr .put (record );
74+ }
75+ writer .append (jsonArr , offset );
76+ offset += jsonArr .length ();
77+ }
78+ } catch (ExecutionException e ) {
79+ // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
80+ // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
81+ // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
82+ System .out .println ("Failed to append records. \n " + e );
83+ }
84+
85+ // Final cleanup for the stream.
86+ writer .cleanup (client );
87+ System .out .println ("Appended records successfully." );
88+ }
89+
90+ // A simple wrapper object showing how the stateful stream writer should be used.
91+ private static class DataWriter {
92+
93+ private JsonStreamWriter streamWriter ;
94+ // Track the number of in-flight requests to wait for all responses before shutting down.
95+ private final Phaser inflightRequestCount = new Phaser (1 );
96+
97+ private final Object lock = new Object ();
98+
99+ @ GuardedBy ("lock" )
100+ private RuntimeException error = null ;
101+
102+ void initialize (TableName parentTable , BigQueryWriteClient client )
103+ throws IOException , DescriptorValidationException , InterruptedException {
50104 // Initialize a write stream for the specified table.
51105 // For more information on WriteStream.Type, see:
52- // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2 /WriteStream.Type.html
106+ // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1 /WriteStream.Type.html
53107 WriteStream stream = WriteStream .newBuilder ().setType (WriteStream .Type .COMMITTED ).build ();
54- TableName parentTable = TableName . of ( projectId , datasetName , tableName );
108+
55109 CreateWriteStreamRequest createWriteStreamRequest =
56110 CreateWriteStreamRequest .newBuilder ()
57111 .setParent (parentTable .toString ())
@@ -62,37 +116,79 @@ public static void writeCommittedStream(String projectId, String datasetName, St
62116 // Use the JSON stream writer to send records in JSON format.
63117 // For more information about JsonStreamWriter, see:
64118 // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
65- try (JsonStreamWriter writer =
66- JsonStreamWriter .newBuilder (writeStream .getName (), writeStream .getTableSchema ())
67- .build ()) {
68- // Write two batches to the stream, each with 10 JSON records. A writer should be
69- // used for as much writes as possible. Creating a writer for just one write is an
70- // antipattern.
71- for (int i = 0 ; i < 2 ; i ++) {
72- // Create a JSON object that is compatible with the table schema.
73- JSONArray jsonArr = new JSONArray ();
74- for (int j = 0 ; j < 10 ; j ++) {
75- JSONObject record = new JSONObject ();
76- record .put ("col1" , String .format ("record %03d-%03d" , i , j ));
77- jsonArr .put (record );
78- }
119+ streamWriter =
120+ JsonStreamWriter .newBuilder (writeStream .getName (), writeStream .getTableSchema ()).build ();
121+ }
79122
80- // To detect duplicate records, pass the index as the record offset.
81- // To disable deduplication, omit the offset or use WriteStream.Type.DEFAULT.
82- ApiFuture <AppendRowsResponse > future = writer .append (jsonArr , /*offset=*/ i * 10 );
83- AppendRowsResponse response = future .get ();
123+ public void append (JSONArray data , long offset )
124+ throws DescriptorValidationException , IOException , ExecutionException {
125+ synchronized (this .lock ) {
126+ // If earlier appends have failed, we need to reset before continuing.
127+ if (this .error != null ) {
128+ throw this .error ;
84129 }
85- // Finalize the stream after use.
86- FinalizeWriteStreamRequest finalizeWriteStreamRequest =
87- FinalizeWriteStreamRequest .newBuilder ().setName (writeStream .getName ()).build ();
88- client .finalizeWriteStream (finalizeWriteStreamRequest );
89130 }
90- System .out .println ("Appended records successfully." );
91- } catch (ExecutionException e ) {
92- // If the wrapped exception is a StatusRuntimeException, check the state of the operation.
93- // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
94- // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
95- System .out .println ("Failed to append records. \n " + e .toString ());
131+ // Append asynchronously for increased throughput.
132+ ApiFuture <AppendRowsResponse > future = streamWriter .append (data , offset );
133+ ApiFutures .addCallback (
134+ future , new DataWriter .AppendCompleteCallback (this ), MoreExecutors .directExecutor ());
135+ // Increase the count of in-flight requests.
136+ inflightRequestCount .register ();
137+ }
138+
139+ public void cleanup (BigQueryWriteClient client ) {
140+ // Wait for all in-flight requests to complete.
141+ inflightRequestCount .arriveAndAwaitAdvance ();
142+
143+ // Close the connection to the server.
144+ streamWriter .close ();
145+
146+ // Verify that no error occurred in the stream.
147+ synchronized (this .lock ) {
148+ if (this .error != null ) {
149+ throw this .error ;
150+ }
151+ }
152+
153+ // Finalize the stream.
154+ FinalizeWriteStreamResponse finalizeResponse =
155+ client .finalizeWriteStream (streamWriter .getStreamName ());
156+ System .out .println ("Rows written: " + finalizeResponse .getRowCount ());
157+ }
158+
159+ public String getStreamName () {
160+ return streamWriter .getStreamName ();
161+ }
162+
163+ static class AppendCompleteCallback implements ApiFutureCallback <AppendRowsResponse > {
164+
165+ private final DataWriter parent ;
166+
167+ public AppendCompleteCallback (DataWriter parent ) {
168+ this .parent = parent ;
169+ }
170+
171+ public void onSuccess (AppendRowsResponse response ) {
172+ System .out .format ("Append %d success\n " , response .getAppendResult ().getOffset ().getValue ());
173+ done ();
174+ }
175+
176+ public void onFailure (Throwable throwable ) {
177+ synchronized (this .parent .lock ) {
178+ if (this .parent .error == null ) {
179+ StorageException storageException = Exceptions .toStorageException (throwable );
180+ this .parent .error =
181+ (storageException != null ) ? storageException : new RuntimeException (throwable );
182+ }
183+ }
184+ System .out .format ("Error: %s\n " , throwable .toString ());
185+ done ();
186+ }
187+
188+ private void done () {
189+ // Reduce the count of in-flight requests.
190+ this .parent .inflightRequestCount .arriveAndDeregister ();
191+ }
96192 }
97193 }
98194}
0 commit comments