Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit de2cb8c

Browse files
committed
Direct Writer 2
1 parent 39ea964 commit de2cb8c

6 files changed

Lines changed: 124 additions & 86 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java

Lines changed: 67 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,21 @@
1010
import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows;
1111
import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoSchema;
1212
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
13-
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
14-
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
13+
import com.google.common.annotations.VisibleForTesting;
1514
import com.google.common.base.Preconditions;
1615
import com.google.common.util.concurrent.MoreExecutors;
1716
import com.google.protobuf.Descriptors;
18-
import com.google.protobuf.Message;
1917
import com.google.protobuf.MessageLite;
20-
2118
import java.io.IOException;
2219
import java.util.List;
23-
import java.util.concurrent.Executor;
24-
import java.util.concurrent.Executors;
2520
import java.util.logging.Logger;
2621

2722
/**
28-
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write API.
29-
* For users writing with COMMITTED stream and don't care about row deduplication, it is recommended to use this Writer.
23+
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write
24+
* API. For users writing with COMMITTED stream and don't care about row deduplication, it is
25+
* recommended to use this Writer.
3026
*
31-
* It supports message batching and flow control. It handles stream creation and schema update.
27+
* <p>It supports message batching and flow control. It handles stream creation and schema update.
3228
*
3329
* <pre>{@code
3430
* DataProto data1;
@@ -52,13 +48,32 @@ public class DirectWriter implements AutoCloseable {
5248

5349
/**
5450
* Constructor of DirectWriter.
55-
* @param tableName Name of the table for ingest in format of 'projects/{pid}/datasets/{did}/tables/{tid}'.
56-
* @param messageDescriptor The descriptor of the input message, to be used to interpret the input messages.
51+
*
52+
* @param tableName Name of the table for ingest in format of
53+
* 'projects/{pid}/datasets/{did}/tables/{tid}'.
54+
* @param messageDescriptor The descriptor of the input message, to be used to interpret the input
55+
* messages.
5756
*/
5857
public DirectWriter(Builder builder) throws Exception {
5958
userSchema = ProtoSchemaConverter.convert(builder.userSchema);
6059
writerCache = WriterCache.getInstance();
61-
writer = writerCache.getWriter(builder.tableName);
60+
StreamWriter writer1 = writerCache.getWriter(builder.tableName);
61+
// If user specifies a different setting, then create a new writer according to the setting.
62+
if ((builder.batchingSettings != null
63+
&& builder.batchingSettings != writer1.getBatchingSettings())
64+
|| (builder.retrySettings != null && builder.retrySettings != writer1.getRetrySettings())) {
65+
StreamWriter.Builder writerBuilder = StreamWriter.newBuilder(writer1.getStreamNameString());
66+
if (builder.batchingSettings != null
67+
&& builder.batchingSettings != writer1.getBatchingSettings()) {
68+
writerBuilder.setBatchingSettings(builder.batchingSettings);
69+
}
70+
if (builder.retrySettings != null && builder.retrySettings != writer1.getRetrySettings()) {
71+
writerBuilder.setRetrySettings(builder.retrySettings);
72+
}
73+
writer1.close();
74+
writer1 = writerBuilder.build();
75+
}
76+
writer = writer1;
6277
}
6378

6479
@Override
@@ -67,11 +82,13 @@ public void close() {
6782
}
6883

6984
/**
70-
* The row is represented in proto buffer messages and it must be compatible to the table's schema in BigQuery.
85+
* The row is represented in proto buffer messages and it must be compatible to the table's schema
86+
* in BigQuery.
7187
*
72-
* @param protoRows rows in proto buffer format. They must be compatible with the schema set on the writer.
73-
* @return A future that contains the offset at which the append happened. Only when the future returns with valid
74-
* offset, then the append actually happened.
88+
* @param protoRows rows in proto buffer format. They must be compatible with the schema set on
89+
* the writer.
90+
* @return A future that contains the offset at which the append happened. Only when the future
91+
* returns with valid offset, then the append actually happened.
7592
* @throws Exception
7693
*/
7794
public ApiFuture<Long> append(List<MessageLite> protoRows) throws Exception {
@@ -87,7 +104,7 @@ public ApiFuture<Long> append(List<MessageLite> protoRows) throws Exception {
87104

88105
return ApiFutures.<Storage.AppendRowsResponse, Long>transform(
89106
writer.append(AppendRowsRequest.newBuilder().setProtoRows(data.build()).build()),
90-
new ApiFunction<Storage.AppendRowsResponse, Long>(){
107+
new ApiFunction<Storage.AppendRowsResponse, Long>() {
91108
@Override
92109
public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
93110
return Long.valueOf(appendRowsResponse.getOffset());
@@ -97,40 +114,55 @@ public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
97114
}
98115

99116
/**
100-
* After this call, messages will be appended using the new schema. Note that user is responsible to keep
101-
* the schema here in sync with the table's actual schema. If they ran out of date, the append may fail.
102-
* User can keep trying, until the table's new schema is picked up.
117+
* After this call, messages will be appended using the new schema. Note that user is responsible
118+
* to keep the schema here in sync with the table's actual schema. If they ran out of date, the
119+
* append may fail. User can keep trying, until the table's new schema is picked up.
120+
*
103121
* @param newSchema
104122
* @throws IOException
105123
* @throws InterruptedException
106124
*/
107-
public void updateSchema(Descriptors.Descriptor newSchema) throws IOException, InterruptedException {
125+
public void updateSchema(Descriptors.Descriptor newSchema)
126+
throws IOException, InterruptedException {
108127
Preconditions.checkArgument(newSchema != null);
109128
writer.refreshAppend();
110129
userSchema = ProtoSchemaConverter.convert(newSchema);
111130
}
112131

113-
public static DirectWriter.Builder newBuilder(String tableName, Descriptors.Descriptor userSchema) {
132+
/** Returns the batch settings on the writer. */
133+
public BatchingSettings getBatchSettings() {
134+
return writer.getBatchingSettings();
135+
}
136+
137+
/** Returns the retry settings on the writer. */
138+
public RetrySettings getRetrySettings() {
139+
return writer.getRetrySettings();
140+
}
141+
142+
@VisibleForTesting
143+
public int getCachedTableCount() {
144+
return writerCache.cachedTableCount();
145+
}
146+
147+
@VisibleForTesting
148+
public int getCachedStreamCount(String tableName) {
149+
return writerCache.cachedStreamCount(tableName);
150+
}
151+
152+
public static DirectWriter.Builder newBuilder(
153+
String tableName, Descriptors.Descriptor userSchema) {
114154
return new DirectWriter.Builder(tableName, userSchema);
115155
}
116156

117-
/** A builder of {@link DirectWriter}s. */
157+
/** A builder of {@link DirectWriter}s.
158+
* As of now, user can specify only the batch and retry settings, but not other common connection settings.
159+
**/
118160
public static final class Builder {
119161
private final String tableName;
120162
private final Descriptors.Descriptor userSchema;
121163

122-
// Connection settings
123-
private static final int THREADS_PER_CPU = 5;
124-
ExecutorProvider executorProvider =
125-
InstantiatingExecutorProvider.newBuilder()
126-
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
127-
.build();
128-
private CredentialsProvider credentialsProvider =
129-
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
130-
TransportChannelProvider channelProvider =
131-
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
132-
133-
// {@code StreamWriter} settings, if null, default to the settings on {@code StreamWriter}.
164+
// If null, default to the settings on the writer in the cache, which in term defaults to existing settings on
165+
// {@code StreamWriter}.
134166
RetrySettings retrySettings = null;
135167
BatchingSettings batchingSettings = null;
136168

@@ -139,24 +171,6 @@ private Builder(String tableName, Descriptors.Descriptor userSchema) {
139171
this.userSchema = Preconditions.checkNotNull(userSchema);
140172
}
141173

142-
/**
143-
* {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
144-
* API endpoint.
145-
*
146-
* <p>For performance, this client benefits from having multiple underlying connections. See
147-
* {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}.
148-
*/
149-
public Builder setChannelProvider(TransportChannelProvider channelProvider) {
150-
this.channelProvider = Preconditions.checkNotNull(channelProvider);
151-
return this;
152-
}
153-
154-
/** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
155-
public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
156-
this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider);
157-
return this;
158-
}
159-
160174
/** Sets the {@code BatchSettings} on the writer. */
161175
public Builder setBatchingSettings(BatchingSettings batchingSettings) {
162176
this.batchingSettings = Preconditions.checkNotNull(batchingSettings);
@@ -169,12 +183,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
169183
return this;
170184
}
171185

172-
/** Gives the ability to set a custom executor to be used by the library. */
173-
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
174-
this.executorProvider = Preconditions.checkNotNull(executorProvider);
175-
return this;
176-
}
177-
178186
/** Builds the {@code DirectWriter}. */
179187
public DirectWriter build() throws Exception {
180188
return new DirectWriter(this);

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,16 @@
4646
import java.util.concurrent.locks.ReentrantLock;
4747
import java.util.logging.Level;
4848
import java.util.logging.Logger;
49-
import org.threeten.bp.Duration;
50-
5149
import java.util.regex.Matcher;
5250
import java.util.regex.Pattern;
51+
import org.threeten.bp.Duration;
5352

5453
/**
5554
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
5655
*
57-
* This is to be used to managed streaming write when you are working with PENDING streams or want to explicitly
58-
* manage offset. In that most common cases when writing with COMMITTED stream without offset, please use a simpler
59-
* writer {@code DirectWriter}.
56+
* <p>This is to be used to managed streaming write when you are working with PENDING streams or
57+
* want to explicitly manage offset. In that most common cases when writing with COMMITTED stream
58+
* without offset, please use a simpler writer {@code DirectWriter}.
6059
*
6160
* <p>A {@link StreamWrier} provides built-in capabilities to: handle batching of messages;
6261
* controlling memory utilization (through flow control); automatic connection re-establishment and
@@ -75,7 +74,8 @@
7574
public class StreamWriter implements AutoCloseable {
7675
private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName());
7776

78-
private static String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/.*";
77+
private static String streamPatternString =
78+
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/.*";
7979

8080
private static Pattern streamPattern = Pattern.compile(streamPatternString);
8181

@@ -119,7 +119,8 @@ public static long getApiMaxInflightRequests() {
119119
private StreamWriter(Builder builder) throws Exception {
120120
Matcher matcher = streamPattern.matcher(builder.streamName);
121121
if (!matcher.matches()) {
122-
throw new InvalidArgumentException(null, GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT), false);
122+
throw new InvalidArgumentException(
123+
null, GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT), false);
123124
}
124125
streamName = builder.streamName;
125126
tableName = matcher.group(1);
@@ -809,7 +810,7 @@ public void onError(Throwable t) {
809810
try {
810811
// Establish a new connection.
811812
streamWriter.refreshAppend();
812-
} catch (IOException e) {
813+
} catch (IOException | InterruptedException e) {
813814
LOG.info("Failed to establish a new connection");
814815
}
815816
}
@@ -829,7 +830,7 @@ private static class MessagesBatch {
829830
private int batchedBytes;
830831
private final BatchingSettings batchingSettings;
831832
private Boolean attachSchema = true;
832-
final private String streamName;
833+
private final String streamName;
833834

834835
private MessagesBatch(BatchingSettings batchingSettings, String streamName) {
835836
this.batchingSettings = batchingSettings;
@@ -839,7 +840,8 @@ private MessagesBatch(BatchingSettings batchingSettings, String streamName) {
839840

840841
// Get all the messages out in a batch.
841842
private InflightBatch popBatch() {
842-
InflightBatch batch = new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema);
843+
InflightBatch batch =
844+
new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema);
843845
this.attachSchema = false;
844846
reset();
845847
return batch;

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.google.cloud.bigquery.storage.v1alpha2;
22

3+
import com.google.common.annotations.VisibleForTesting;
34
import com.google.common.base.Preconditions;
45
import java.util.Date;
56
import java.util.Map;
@@ -14,19 +15,18 @@
1415
* minutes if not used. Code sample: WriterCache cache = WriterCache.getInstance(); StreamWriter
1516
* writer = cache.getWriter(); // Use... cache.returnWriter(writer);
1617
*/
17-
public class StreamCache {
18-
private static final Logger LOG = Logger.getLogger(StreamCache.class.getName());
18+
public class WriterCache {
19+
private static final Logger LOG = Logger.getLogger(WriterCache.class.getName());
1920

20-
private static StreamCache instance;
21+
private static WriterCache instance;
2122

22-
private Duration expireTime = Duration.ofSeconds(300);
2323
private ConcurrentHashMap<String, Map<String, Pair<StreamWriter, Long>>> cacheWithTimeout;
2424

2525
private final BigQueryWriteClient stub;
2626
private final BigQueryWriteSettings stubSettings;
2727
private final CleanerThread cleanerThread;
2828

29-
private StreamCache() throws Exception {
29+
private WriterCache(Duration expireTime) throws Exception {
3030
cacheWithTimeout = new ConcurrentHashMap<>();
3131
stubSettings = BigQueryWriteSettings.newBuilder().build();
3232
stub = BigQueryWriteClient.create(stubSettings);
@@ -41,9 +41,17 @@ public void run() {
4141
});
4242
}
4343

44-
public static StreamCache getInstance() throws Exception {
44+
public static WriterCache getInstance() throws Exception {
4545
if (instance == null) {
46-
instance = new StreamCache();
46+
instance = new WriterCache(Duration.ofMinutes(5));
47+
}
48+
return instance;
49+
}
50+
51+
@VisibleForTesting
52+
public static WriterCache getInstance(Duration expireTime) throws Exception {
53+
if (instance == null) {
54+
instance = new WriterCache(expireTime);
4755
}
4856
return instance;
4957
}
@@ -123,6 +131,22 @@ public void returnWriter(StreamWriter writer) {
123131
}
124132
}
125133

134+
public int cachedTableCount() {
135+
synchronized (cacheWithTimeout) {
136+
return cacheWithTimeout.keySet().size();
137+
}
138+
}
139+
140+
public int cachedStreamCount(String tableName) {
141+
synchronized (cacheWithTimeout) {
142+
if (cacheWithTimeout.contains(tableName)) {
143+
return cacheWithTimeout.get(tableName).values().size();
144+
} else {
145+
return 0;
146+
}
147+
}
148+
}
149+
126150
private class CleanerThread extends Thread {
127151
private long expiryInMillis;
128152
private ConcurrentHashMap<String, Map<String, Pair<StreamWriter, Long>>> timeMap;
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
package com.google.cloud.bigquery.storage.v1alpha2;
22

3-
public class DirectWriterTest {
4-
}
3+
public class DirectWriterTest {}
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
package com.google.cloud.bigquery.storage.v1alpha2;
22

3-
public class WriterCacheTest {
4-
}
3+
public class WriterCacheTest {}

0 commit comments

Comments
 (0)