Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 488f258

Browse files
yirutangyoshi-automationstephaniewang526
authored
feat: better default stream support in client library (#750)
* . * . * . * feat: Support default stream on streamWriter and jsonStreamWriter * add integration test * . * . * ci(java): ignore bot users for generate-files-bot (#749) Depends on googleapis/repo-automation-bots#1254 Fixes googleapis/repo-automation-bots#1096 Source-Author: Jeff Ching <chingor@google.com> Source-Date: Tue Dec 15 16:16:07 2020 -0800 Source-Repo: googleapis/synthtool Source-Sha: 3f67ceece7e797a5736a25488aae35405649b90b Source-Link: googleapis/synthtool@3f67cee * . * . * chore: synthtool changes (#746) * changes without context autosynth cannot find the source of changes triggered by earlier changes in this repository, or by version upgrades to tools such as linters. * chore: migrate java-bigquerystorage to the Java microgenerator Committer: @miraleung PiperOrigin-RevId: 345311069 Source-Author: Google APIs <noreply@google.com> Source-Date: Wed Dec 2 14:17:15 2020 -0800 Source-Repo: googleapis/googleapis Source-Sha: e39e42f368d236203a774ee994fcb4d730c33a83 Source-Link: googleapis/googleapis@e39e42f * feat!: Updates to BigQuery Write API V1Beta2 public interface. This includes breaking changes to the API, it is fine because the API is not officially launched yet. PiperOrigin-RevId: 345469340 Source-Author: Google APIs <noreply@google.com> Source-Date: Thu Dec 3 09:33:11 2020 -0800 Source-Repo: googleapis/googleapis Source-Sha: b53c4d98aab1eae3dac90b37019dede686782f13 Source-Link: googleapis/googleapis@b53c4d9 * fix: Update gapic-generator-java to 0.0.7 Committer: @miraleung PiperOrigin-RevId: 345476969 Source-Author: Google APIs <noreply@google.com> Source-Date: Thu Dec 3 10:07:32 2020 -0800 Source-Repo: googleapis/googleapis Source-Sha: 7be2c821dd88109038c55c89f7dd48f092eeab9d Source-Link: googleapis/googleapis@7be2c82 * chore: rollback migrating java-bigquerystorage to the Java microgenerator Committer: @miraleung PiperOrigin-RevId: 345522380 Source-Author: Google APIs <noreply@google.com> Source-Date: Thu Dec 3 13:28:07 2020 -0800 Source-Repo: googleapis/googleapis Source-Sha: f8f975c7d43904e90d6c5f1684fdb6804400e641 Source-Link: googleapis/googleapis@f8f975c * chore: migrate java-bigquerystorage to the Java microgenerator Committer: @miraleung PiperOrigin-RevId: 346405446 Source-Author: Google APIs <noreply@google.com> Source-Date: Tue Dec 8 14:03:11 2020 -0800 Source-Repo: googleapis/googleapis Source-Sha: abc43060f136ce77124754a48f367102e646844a Source-Link: googleapis/googleapis@abc4306 * chore: update gapic-generator-java to 0.0.11 Committer: @miraleung PiperOrigin-RevId: 347036369 Source-Author: Google APIs <noreply@google.com> Source-Date: Fri Dec 11 11:13:47 2020 -0800 Source-Repo: googleapis/googleapis Source-Sha: 6d65640b1fcbdf26ea76cb720de0ac138cae9bed Source-Link: googleapis/googleapis@6d65640 * chore: update gapic-generator-java to 0.0.13 Committer: @miraleung PiperOrigin-RevId: 347849179 Source-Author: Google APIs <noreply@google.com> Source-Date: Wed Dec 16 10:28:38 2020 -0800 Source-Repo: googleapis/googleapis Source-Sha: 360a0e177316b7e9811f2ccbbef11e5f83377f3f Source-Link: googleapis/googleapis@360a0e1 * . * . * . * . * . * . * . * fix flushall test Co-authored-by: Yoshi Automation Bot <yoshi-automation@google.com> Co-authored-by: Stephanie Wang <stephaniewang526@users.noreply.github.com>
1 parent 0988105 commit 488f258

9 files changed

Lines changed: 372 additions & 66 deletions

File tree

.github/generated-files-bot.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,3 @@ externalManifests:
55
- type: json
66
file: '.github/readme/synth.metadata/synth.metadata'
77
jsonpath: '$.generatedFiles[*]'
8-
ignoreAuthors:
9-
- 'renovate-bot'
10-
- 'yoshi-automation'
11-
- 'release-please[bot]'
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigquery.storage.v1beta2;
17+
18+
import com.google.cloud.bigquery.Field;
19+
import com.google.cloud.bigquery.Schema;
20+
import com.google.cloud.bigquery.StandardSQLTypeName;
21+
import com.google.common.collect.ImmutableMap;
22+
23+
/** Converts structure from BigQuery v2 API to BigQueryStorage API */
24+
public class BQV2ToBQStorageConverter {
25+
private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
26+
ImmutableMap.of(
27+
Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
28+
Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
29+
Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);
30+
31+
private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
32+
new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
33+
.put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
34+
.put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
35+
.put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
36+
.put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
37+
.put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
38+
.put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
39+
.put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
40+
.put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
41+
.put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
42+
.put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
43+
.put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
44+
.put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
45+
.build();
46+
47+
/**
48+
* Converts from bigquery v2 Table Schema to bigquery storage API Table Schema.
49+
*
50+
* @param schame the bigquery v2 Table Schema
51+
* @return the bigquery storage API Table Schema
52+
*/
53+
public static TableSchema ConvertTableSchema(Schema schema) {
54+
TableSchema.Builder result = TableSchema.newBuilder();
55+
for (int i = 0; i < schema.getFields().size(); i++) {
56+
result.addFields(i, ConvertFieldSchema(schema.getFields().get(i)));
57+
}
58+
return result.build();
59+
}
60+
61+
/**
62+
* Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
63+
*
64+
* @param schame the bigquery v2 Field Schema
65+
* @return the bigquery storage API Field Schema
66+
*/
67+
public static TableFieldSchema ConvertFieldSchema(Field field) {
68+
TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
69+
if (field.getMode() == null) {
70+
field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
71+
}
72+
result.setMode(BQTableSchemaModeMap.get(field.getMode()));
73+
result.setName(field.getName());
74+
result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
75+
if (field.getDescription() != null) {
76+
result.setDescription(field.getDescription());
77+
}
78+
if (field.getSubFields() != null) {
79+
for (int i = 0; i < field.getSubFields().size(); i++) {
80+
result.addFields(i, ConvertFieldSchema(field.getSubFields().get(i)));
81+
}
82+
}
83+
return result.build();
84+
}
85+
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import com.google.api.gax.core.ExecutorProvider;
2222
import com.google.api.gax.retrying.RetrySettings;
2323
import com.google.api.gax.rpc.TransportChannelProvider;
24+
import com.google.cloud.bigquery.Schema;
2425
import com.google.common.base.Preconditions;
2526
import com.google.protobuf.Descriptors;
2627
import com.google.protobuf.Descriptors.Descriptor;
2728
import com.google.protobuf.Int64Value;
2829
import com.google.protobuf.Message;
2930
import java.io.IOException;
3031
import java.util.logging.Logger;
31-
import java.util.regex.Matcher;
3232
import java.util.regex.Pattern;
3333
import javax.annotation.Nullable;
3434
import org.json.JSONArray;
@@ -62,21 +62,15 @@ public class JsonStreamWriter implements AutoCloseable {
6262
private JsonStreamWriter(Builder builder)
6363
throws Descriptors.DescriptorValidationException, IllegalArgumentException, IOException,
6464
InterruptedException {
65-
Matcher matcher = streamPattern.matcher(builder.streamName);
66-
if (!matcher.matches()) {
67-
throw new IllegalArgumentException("Invalid stream name: " + builder.streamName);
68-
}
69-
70-
this.streamName = builder.streamName;
7165
this.client = builder.client;
7266
this.descriptor =
7367
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);
7468

7569
StreamWriter.Builder streamWriterBuilder;
7670
if (this.client == null) {
77-
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName);
71+
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
7872
} else {
79-
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client);
73+
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
8074
}
8175
setStreamWriterSettings(
8276
streamWriterBuilder,
@@ -85,9 +79,12 @@ private JsonStreamWriter(Builder builder)
8579
builder.batchingSettings,
8680
builder.retrySettings,
8781
builder.executorProvider,
88-
builder.endpoint);
82+
builder.endpoint,
83+
builder.createDefaultStream);
8984
this.streamWriter = streamWriterBuilder.build();
85+
this.streamName = this.streamWriter.getStreamNameString();
9086
}
87+
9188
/**
9289
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
9390
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
@@ -126,12 +123,12 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
126123
synchronized (this) {
127124
data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
128125
data.setRows(rowsBuilder.build());
126+
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
127+
if (offset >= 0) {
128+
request.setOffset(Int64Value.of(offset));
129+
}
129130
final ApiFuture<AppendRowsResponse> appendResponseFuture =
130-
this.streamWriter.append(
131-
AppendRowsRequest.newBuilder()
132-
.setProtoRows(data.build())
133-
.setOffset(Int64Value.of(offset))
134-
.build());
131+
this.streamWriter.append(request.build());
135132
return appendResponseFuture;
136133
}
137134
}
@@ -179,7 +176,8 @@ private void setStreamWriterSettings(
179176
@Nullable BatchingSettings batchingSettings,
180177
@Nullable RetrySettings retrySettings,
181178
@Nullable ExecutorProvider executorProvider,
182-
@Nullable String endpoint) {
179+
@Nullable String endpoint,
180+
Boolean createDefaultStream) {
183181
if (channelProvider != null) {
184182
builder.setChannelProvider(channelProvider);
185183
}
@@ -198,6 +196,9 @@ private void setStreamWriterSettings(
198196
if (endpoint != null) {
199197
builder.setEndpoint(endpoint);
200198
}
199+
if (createDefaultStream) {
200+
builder.createDefaultStream();
201+
}
201202
JsonStreamWriterOnSchemaUpdateRunnable jsonStreamWriterOnSchemaUpdateRunnable =
202203
new JsonStreamWriterOnSchemaUpdateRunnable();
203204
jsonStreamWriterOnSchemaUpdateRunnable.setJsonStreamWriter(this);
@@ -217,34 +218,53 @@ void setTableSchema(TableSchema tableSchema) {
217218
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
218219
* StreamWriter by default.
219220
*
220-
* @param streamName name of the stream that must follow
221+
* @param streamOrTableName name of the stream that must follow
222+
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or if it is default stream
223+
* (createDefaultStream is true on builder), then the name here should be a table name
224+
* ""projects/[^/]+/datasets/[^/]+/tables/[^/]+"
225+
* @param tableSchema The schema of the table when the stream was created, which is passed back
226+
* through {@code WriteStream}
227+
* @return Builder
228+
*/
229+
public static Builder newBuilder(String streamOrTableName, TableSchema tableSchema) {
230+
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
231+
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
232+
return new Builder(streamOrTableName, tableSchema, null);
233+
}
234+
235+
/**
236+
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
237+
* StreamWriter by default.
238+
*
239+
* @param streamOrTableName name of the stream that must follow
221240
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
222241
* @param tableSchema The schema of the table when the stream was created, which is passed back
223242
* through {@code WriteStream}
224243
* @return Builder
225244
*/
226-
public static Builder newBuilder(String streamName, TableSchema tableSchema) {
227-
Preconditions.checkNotNull(streamName, "StreamName is null.");
245+
public static Builder newBuilder(String streamOrTableName, Schema tableSchema) {
246+
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
228247
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
229-
return new Builder(streamName, tableSchema, null);
248+
return new Builder(
249+
streamOrTableName, BQV2ToBQStorageConverter.ConvertTableSchema(tableSchema), null);
230250
}
231251

232252
/**
233253
* newBuilder that constructs a JsonStreamWriter builder.
234254
*
235-
* @param streamName name of the stream that must follow
255+
* @param streamOrTableName name of the stream that must follow
236256
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
237257
* @param tableSchema The schema of the table when the stream was created, which is passed back
238258
* through {@code WriteStream}
239259
* @param client
240260
* @return Builder
241261
*/
242262
public static Builder newBuilder(
243-
String streamName, TableSchema tableSchema, BigQueryWriteClient client) {
244-
Preconditions.checkNotNull(streamName, "StreamName is null.");
263+
String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
264+
Preconditions.checkNotNull(streamOrTableName, "StreamName is null.");
245265
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
246266
Preconditions.checkNotNull(client, "BigQuery client is null.");
247-
return new Builder(streamName, tableSchema, client);
267+
return new Builder(streamOrTableName, tableSchema, client);
248268
}
249269

250270
/** Closes the underlying StreamWriter. */
@@ -287,7 +307,7 @@ public void run() {
287307
}
288308

289309
public static final class Builder {
290-
private String streamName;
310+
private String streamOrTableName;
291311
private BigQueryWriteClient client;
292312
private TableSchema tableSchema;
293313

@@ -297,17 +317,19 @@ public static final class Builder {
297317
private RetrySettings retrySettings;
298318
private ExecutorProvider executorProvider;
299319
private String endpoint;
320+
private boolean createDefaultStream = false;
300321

301322
/**
302323
* Constructor for JsonStreamWriter's Builder
303324
*
304-
* @param streamName name of the stream that must follow
305-
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
325+
* @param streamOrTableName name of the stream that must follow
326+
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or
327+
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/_default"
306328
* @param tableSchema schema used to convert Json to proto messages.
307329
* @param client
308330
*/
309-
private Builder(String streamName, TableSchema tableSchema, BigQueryWriteClient client) {
310-
this.streamName = streamName;
331+
private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
332+
this.streamOrTableName = streamOrTableName;
311333
this.tableSchema = tableSchema;
312334
this.client = client;
313335
}
@@ -371,6 +393,16 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
371393
return this;
372394
}
373395

396+
/**
397+
* If it is writing to a default stream.
398+
*
399+
* @return Builder
400+
*/
401+
public Builder createDefaultStream() {
402+
this.createDefaultStream = true;
403+
return this;
404+
}
405+
374406
/**
375407
* Setter for the underlying StreamWriter's Endpoint.
376408
*

0 commit comments

Comments
 (0)