Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit ed718c1

Browse files
authored
feat: Direct writer (#165)
* feat:Direct Writer new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java r 39ea964 feat:Direct Writer r de2cb8c feat:Direct Writer 2 pick 8e67681 feat:direct writer 3 new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java * feat:Direct Writer 2 * feat:direct writer 3 modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java * Fix a logging * Add very basic schema compact check modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java * fix e2e modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
1 parent e739f5f commit ed718c1

12 files changed

Lines changed: 1211 additions & 58 deletions

File tree

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigquery.storage.v1alpha2;
17+
18+
import com.google.api.core.*;
19+
import com.google.api.gax.grpc.GrpcStatusCode;
20+
import com.google.api.gax.rpc.InvalidArgumentException;
21+
import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows;
22+
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
23+
import com.google.common.annotations.VisibleForTesting;
24+
import com.google.common.util.concurrent.MoreExecutors;
25+
import com.google.protobuf.Descriptors;
26+
import com.google.protobuf.Message;
27+
import io.grpc.Status;
28+
import java.io.IOException;
29+
import java.util.List;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantLock;
32+
import java.util.logging.Logger;
33+
34+
/**
35+
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write
36+
* API. For users writing with COMMITTED stream and don't care about row deduplication, it is
37+
* recommended to use this Writer.
38+
*
39+
* <pre>{@code
40+
* DataProto data;
41+
* ApiFuture<Long> response = DirectWriter.<DataProto>append("projects/pid/datasets/did/tables/tid", Arrays.asList(data1));
42+
* }</pre>
43+
*
44+
* <p>{@link DirectWriter} will use the credentials set on the channel, which uses application
45+
* default credentials through {@link GoogleCredentials#getApplicationDefault} by default.
46+
*/
47+
public class DirectWriter {
48+
private static final Logger LOG = Logger.getLogger(DirectWriter.class.getName());
49+
private static WriterCache cache = null;
50+
private static Lock cacheLock = new ReentrantLock();
51+
52+
/**
53+
* Append rows to the given table.
54+
*
55+
* @param tableName table name in the form of "projects/{pName}/datasets/{dName}/tables/{tName}"
56+
* @param protoRows rows in proto buffer format.
57+
* @return A future that contains the offset at which the append happened. Only when the future
58+
* returns with valid offset, then the append actually happened.
59+
* @throws IOException, InterruptedException, InvalidArgumentException
60+
*/
61+
public static <T extends Message> ApiFuture<Long> append(String tableName, List<T> protoRows)
62+
throws IOException, InterruptedException, InvalidArgumentException {
63+
if (protoRows.isEmpty()) {
64+
throw new InvalidArgumentException(
65+
new Exception("Empty rows are not allowed"),
66+
GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT),
67+
false);
68+
}
69+
try {
70+
cacheLock.lock();
71+
if (cache == null) {
72+
cache = WriterCache.getInstance();
73+
}
74+
} finally {
75+
cacheLock.unlock();
76+
}
77+
78+
StreamWriter writer = cache.getTableWriter(tableName, protoRows.get(0).getDescriptorForType());
79+
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
80+
Descriptors.Descriptor descriptor = null;
81+
for (Message protoRow : protoRows) {
82+
rowsBuilder.addSerializedRows(protoRow.toByteString());
83+
}
84+
85+
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
86+
data.setWriterSchema(ProtoSchemaConverter.convert(protoRows.get(0).getDescriptorForType()));
87+
data.setRows(rowsBuilder.build());
88+
89+
return ApiFutures.<Storage.AppendRowsResponse, Long>transform(
90+
writer.append(AppendRowsRequest.newBuilder().setProtoRows(data.build()).build()),
91+
new ApiFunction<Storage.AppendRowsResponse, Long>() {
92+
@Override
93+
public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
94+
return Long.valueOf(appendRowsResponse.getOffset());
95+
}
96+
},
97+
MoreExecutors.directExecutor());
98+
}
99+
100+
@VisibleForTesting
101+
public static void testSetStub(
102+
BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) {
103+
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
104+
}
105+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigquery.storage.v1alpha2;
17+
18+
import com.google.cloud.bigquery.BigQuery;
19+
import com.google.cloud.bigquery.Schema;
20+
import com.google.cloud.bigquery.Table;
21+
import com.google.cloud.bigquery.TableId;
22+
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
23+
import com.google.common.annotations.VisibleForTesting;
24+
import com.google.protobuf.Descriptors;
25+
import java.util.regex.Matcher;
26+
import java.util.regex.Pattern;
27+
28+
/**
29+
* A class that checks the schema compatibility between user schema in proto descriptor and Bigquery
30+
* table schema. If this check is passed, then user can write to BigQuery table using the user
31+
* schema, otherwise the write will fail.
32+
*
33+
* <p>The implementation as of now is not complete, which measn, if this check passed, there is
34+
* still a possbility of writing will fail.
35+
*/
36+
public class SchemaCompact {
37+
private BigQuery bigquery;
38+
private static SchemaCompact compact;
39+
private static String tablePatternString = "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)";
40+
private static Pattern tablePattern = Pattern.compile(tablePatternString);
41+
42+
private SchemaCompact(BigQuery bigquery) {
43+
this.bigquery = bigquery;
44+
}
45+
46+
/**
47+
* Gets a singleton {code SchemaCompact} object.
48+
*
49+
* @return
50+
*/
51+
public static SchemaCompact getInstance() {
52+
if (compact == null) {
53+
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
54+
compact = new SchemaCompact(bigqueryHelper.getOptions().getService());
55+
}
56+
return compact;
57+
}
58+
59+
/**
60+
* Gets a {code SchemaCompact} object with custom BigQuery stub.
61+
*
62+
* @param bigquery
63+
* @return
64+
*/
65+
@VisibleForTesting
66+
public static SchemaCompact getInstance(BigQuery bigquery) {
67+
return new SchemaCompact(bigquery);
68+
}
69+
70+
private TableId getTableId(String tableName) {
71+
Matcher matcher = tablePattern.matcher(tableName);
72+
if (!matcher.matches() || matcher.groupCount() != 3) {
73+
throw new IllegalArgumentException("Invalid table name: " + tableName);
74+
}
75+
return TableId.of(matcher.group(1), matcher.group(2), matcher.group(3));
76+
}
77+
78+
/**
79+
* Checks if the userSchema is compatible with the table's current schema for writing. The current
80+
* implementatoin is not complete. If the check failed, the write couldn't succeed.
81+
*
82+
* @param tableName The name of the table to write to.
83+
* @param userSchema The schema user uses to append data.
84+
* @throws IllegalArgumentException the check failed.
85+
*/
86+
public void check(String tableName, Descriptors.Descriptor userSchema)
87+
throws IllegalArgumentException {
88+
Table table = bigquery.getTable(getTableId(tableName));
89+
Schema schema = table.getDefinition().getSchema();
90+
// TODO: We only have very limited check here. More checks to be added.
91+
if (schema.getFields().size() != userSchema.getFields().size()) {
92+
throw new IllegalArgumentException(
93+
"User schema doesn't have expected field number with BigQuery table schema, expected: "
94+
+ schema.getFields().size()
95+
+ " actual: "
96+
+ userSchema.getFields().size());
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)