From 43fc284e00ddbc9a018d734e3f6f09c82ebd92d4 Mon Sep 17 00:00:00 2001 From: Martin Mladenovski Date: Fri, 7 Feb 2020 08:27:06 -0800 Subject: [PATCH 1/4] feat: add an enhanced layer for BigQuery Storage v1 client (#66) * feat: seed v1 with v1beta2 enhanced layer files This commit makes it easier for reviewing later changes that update the enhanced layer to v1. * feat: add an enhanced layer for BigQuery Storage v1 client v1beta2 and v1 APIs are the same, with the only difference in the version. --- .../storage/v1/BigQueryReadClient.java | 376 ++++++++++++++++++ .../storage/v1/BigQueryReadSettings.java | 202 ++++++++++ .../v1/stub/EnhancedBigQueryReadStub.java | 121 ++++++ .../EnhancedBigQueryReadStubSettings.java | 232 +++++++++++ .../readrows/ReadRowsResumptionStrategy.java | 72 ++++ .../v1/stub/readrows/package-info.java | 16 + .../storage/v1/BigQueryReadClientTest.java | 165 ++++++++ .../EnhancedBigQueryReadStubSettingsTest.java | 142 +++++++ .../storage/v1/stub/ResourceHeaderTest.java | 140 +++++++ .../v1/stub/readrows/ReadRowsRetryTest.java | 243 +++++++++++ 10 files changed, 1709 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettings.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsResumptionStrategy.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettingsTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java new file mode 100644 index 0000000000..e3fbf56165 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java @@ -0,0 +1,376 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.BetaApi; +import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Service Description: BigQuery Read API. + * + *

The Read API can be used to read data from BigQuery. + * + *

This class provides the ability to make remote calls to the backing service through method + * calls that map to API methods. Sample code to get started: + * + *

+ * 
+ * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+ *   String parent = "";
+ *   ReadSession readSession = ReadSession.newBuilder().build();
+ *   int maxStreamCount = 0;
+ *   ReadSession response = BigQueryReadClient.createReadSession(parent, readSession, maxStreamCount);
+ * }
+ * 
+ * 
+ * + *

Note: close() needs to be called on the BigQueryReadClient object to clean up resources such + * as threads. In the example above, try-with-resources is used, which automatically calls close(). + * + *

The surface of this class includes several types of Java methods for each of the API's + * methods: + * + *

    + *
  1. A "flattened" method. With this type of method, the fields of the request type have been + * converted into function parameters. It may be the case that not all fields are available as + * parameters, and not every API method will have a flattened method entry point. + *
  2. A "request object" method. This type of method only takes one parameter, a request object, + * which must be constructed before the call. Not every API method will have a request object + * method. + *
  3. A "callable" method. This type of method takes no parameters and returns an immutable API + * callable object, which can be used to initiate calls to the service. + *
+ * + *

See the individual methods for example code. + * + *

Many parameters require resource names to be formatted in a particular way. To assist with + * these names, this class includes a format method for each type of name, and additionally a parse + * method to extract the individual identifiers contained within names that are returned. + * + *

This class can be customized by passing in a custom instance of BigQueryReadSettings to + * create(). For example: + * + *

To customize credentials: + * + *

+ * 
+ * BigQueryReadSettings BigQueryReadSettings =
+ *     BigQueryReadSettings.newBuilder()
+ *         .setCredentialsProvider(FixedCredentialsProvider.create(myCredentials))
+ *         .build();
+ * BigQueryReadClient BigQueryReadClient =
+ *     BigQueryReadClient.create(BigQueryReadSettings);
+ * 
+ * 
+ * + * To customize the endpoint: + * + *
+ * 
+ * BigQueryReadSettings BigQueryReadSettings =
+ *     BigQueryReadSettings.newBuilder().setEndpoint(myEndpoint).build();
+ * BigQueryReadClient BigQueryReadClient =
+ *     BigQueryReadClient.create(BigQueryReadSettings);
+ * 
+ * 
+ */ +@BetaApi +public class BigQueryReadClient implements BackgroundResource { + private final BigQueryReadSettings settings; + private final EnhancedBigQueryReadStub stub; + + /** Constructs an instance of BigQueryReadClient with default settings. */ + public static final BigQueryReadClient create() throws IOException { + return create(BigQueryReadSettings.newBuilder().build()); + } + + /** + * Constructs an instance of BigQueryReadClient, using the given settings. The channels are + * created based on the settings passed in, or defaults for any settings that are not set. + */ + public static final BigQueryReadClient create(BigQueryReadSettings settings) throws IOException { + return new BigQueryReadClient(settings); + } + + /** + * Constructs an instance of BigQueryReadClient, using the given stub for making calls. This is + * for advanced usage - prefer to use BigQueryReadSettings}. + */ + @BetaApi("A restructuring of stub classes is planned, so this may break in the future") + public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) { + return new BigQueryReadClient(stub); + } + + /** + * Constructs an instance of BigQueryReadClient, using the given settings. This is protected so + * that it is easy to make a subclass, but otherwise, the static factory methods should be + * preferred. + */ + protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException { + this.settings = settings; + this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings()); + } + + @BetaApi("A restructuring of stub classes is planned, so this may break in the future") + protected BigQueryReadClient(EnhancedBigQueryReadStub stub) { + this.settings = null; + this.stub = stub; + } + + public final BigQueryReadSettings getSettings() { + return settings; + } + + @BetaApi("A restructuring of stub classes is planned, so this may break in the future") + public EnhancedBigQueryReadStub getStub() { + return stub; + } + + /** + * Creates a new read session. A read session divides the contents of a BigQuery table into one or + * more streams, which can then be used to read data from the table. The read session also + * specifies properties of the data to be read, such as a list of columns or a push-down filter + * describing the rows to be returned. + * + *

A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *

Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *

Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+   *   String parent = "";
+   *   ReadSession readSession = ReadSession.newBuilder().build();
+   *   int maxStreamCount = 0;
+   *   ReadSession response = BigQueryReadClient.createReadSession(parent, readSession, maxStreamCount);
+   * }
+   * 
+ * + * @param parent Required. The request project that owns the session, in the form of + * `projects/{project_id}`. + * @param readSession Required. Session to be created. + * @param maxStreamCount Max initial number of streams. If unset or zero, the server will provide + * a value of streams so as to produce reasonable throughput. Must be non-negative. The number + * of streams may be lower than the requested number, depending on the amount parallelism that + * is reasonable for the table. Error will be returned if the max count is greater than the + * current system max limit of 1,000. + *

Streams must be read starting from offset 0. + * @throws com.google.api.gax.rpc.ApiException if the remote call fails + */ + public final ReadSession createReadSession( + String parent, ReadSession readSession, int maxStreamCount) { + + CreateReadSessionRequest request = + CreateReadSessionRequest.newBuilder() + .setParent(parent) + .setReadSession(readSession) + .setMaxStreamCount(maxStreamCount) + .build(); + return createReadSession(request); + } + + /** + * Creates a new read session. A read session divides the contents of a BigQuery table into one or + * more streams, which can then be used to read data from the table. The read session also + * specifies properties of the data to be read, such as a list of columns or a push-down filter + * describing the rows to be returned. + * + *

A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *

Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *

Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient BigQueryReadClient = BigQueryReadClient.create()) {
+   *   CreateReadSessionRequest request = CreateReadSessionRequest.newBuilder().build();
+   *   ReadSession response = BigQueryReadClient.createReadSession(request);
+   * }
+   * 
+ * + * @param request The request object containing all of the parameters for the API call. + * @throws com.google.api.gax.rpc.ApiException if the remote call fails + */ + public final ReadSession createReadSession(CreateReadSessionRequest request) { + return createReadSessionCallable().call(request); + } + + /** + * Creates a new read session. A read session divides the contents of a BigQuery table into one or + * more streams, which can then be used to read data from the table. The read session also + * specifies properties of the data to be read, such as a list of columns or a push-down filter + * describing the rows to be returned. + * + *

A particular row can be read by at most one stream. When the caller has reached the end of + * each stream in the session, then all the data in the table has been read. + * + *

Data is assigned to each stream such that roughly the same number of rows can be read from + * each stream. Because the server-side unit for assigning data is collections of rows, the API + * does not guarantee that each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some filters can lead to + * lopsided assignments. + * + *

Read sessions automatically expire 24 hours after they are created and do not require manual + * clean-up by the caller. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+   *   CreateReadSessionRequest request = CreateReadSessionRequest.newBuilder().build();
+   *   ApiFuture<ReadSession> future = BigQueryReadClient.createReadSessionCallable().futureCall(request);
+   *   // Do something
+   *   ReadSession response = future.get();
+   * }
+   * 
+ */ + public final UnaryCallable createReadSessionCallable() { + return stub.createReadSessionCallable(); + } + + /** + * Reads rows from the stream in the format prescribed by the ReadSession. Each response contains + * one or more table rows, up to a maximum of 100 MiB per response; read requests which attempt to + * read individual rows larger than 100 MiB will fail. + * + *

Each request also returns a set of stream statistics reflecting the current state of the + * stream. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+   *   ReadRowsRequest request = ReadRowsRequest.newBuilder().build();
+   *
+   *   ServerStream<ReadRowsResponse> stream = bigQueryReadClient.readRowsCallable().call(request);
+   *   for (ReadRowsResponse response : stream) {
+   *     // Do something when receive a response
+   *   }
+   * }
+   * 
+ */ + public final ServerStreamingCallable readRowsCallable() { + return stub.readRowsCallable(); + } + + /** + * Splits a given `ReadStream` into two `ReadStream` objects. These `ReadStream` objects are + * referred to as the primary and the residual streams of the split. The original `ReadStream` can + * still be read from in the same manner as before. Both of the returned `ReadStream` objects can + * also be read from, and the rows returned by both child streams will be the same as the rows + * read from the original stream. + * + *

Moreover, the two child streams will be allocated back-to-back in the original `ReadStream`. + * Concretely, it is guaranteed that for streams original, primary, and residual, that + * original[0-j] = primary[0-j] and original[j-n] = residual[0-m] once the streams have been read + * to completion. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+   *   SplitReadStreamRequest request = SplitReadStreamRequest.newBuilder().build();
+   *   SplitReadStreamResponse response = bigQueryReadClient.splitReadStream(request);
+   * }
+   * 
+ * + * @param request The request object containing all of the parameters for the API call. + * @throws com.google.api.gax.rpc.ApiException if the remote call fails + */ + public final SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request) { + return splitReadStreamCallable().call(request); + } + + /** + * Splits a given `ReadStream` into two `ReadStream` objects. These `ReadStream` objects are + * referred to as the primary and the residual streams of the split. The original `ReadStream` can + * still be read from in the same manner as before. Both of the returned `ReadStream` objects can + * also be read from, and the rows returned by both child streams will be the same as the rows + * read from the original stream. + * + *

Moreover, the two child streams will be allocated back-to-back in the original `ReadStream`. + * Concretely, it is guaranteed that for streams original, primary, and residual, that + * original[0-j] = primary[0-j] and original[j-n] = residual[0-m] once the streams have been read + * to completion. + * + *

Sample code: + * + *


+   * try (BigQueryReadClient bigQueryReadClient = BigQueryReadClient.create()) {
+   *   SplitReadStreamRequest request = SplitReadStreamRequest.newBuilder().build();
+   *   ApiFuture<SplitReadStreamResponse> future = bigQueryReadClient.splitReadStreamCallable().futureCall(request);
+   *   // Do something
+   *   SplitReadStreamResponse response = future.get();
+   * }
+   * 
+ */ + public final UnaryCallable + splitReadStreamCallable() { + return stub.splitReadStreamCallable(); + } + + @Override + public final void close() { + stub.close(); + } + + @Override + public void shutdown() { + stub.shutdown(); + } + + @Override + public boolean isShutdown() { + return stub.isShutdown(); + } + + @Override + public boolean isTerminated() { + return stub.isTerminated(); + } + + @Override + public void shutdownNow() { + stub.shutdownNow(); + } + + @Override + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return stub.awaitTermination(duration, unit); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java new file mode 100644 index 0000000000..fcf02a2331 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java @@ -0,0 +1,202 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.core.ApiFunction; +import com.google.api.core.BetaApi; +import com.google.api.gax.core.GoogleCredentialsProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ClientSettings; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings; +import java.io.IOException; +import java.util.List; + +/** + * Settings class to configure an instance of {@link BigQueryReadClient}. + * + *

The default instance has everything set to sensible defaults: + * + *

    + *
  • The default service address (bigquerystorage.googleapis.com) and default port (443) are + * used. + *
  • Credentials are acquired automatically through Application Default Credentials. + *
  • Retries are configured for idempotent methods but not for non-idempotent methods. + *
+ * + *

The builder of this class is recursive, so contained classes are themselves builders. When + * build() is called, the tree of builders is called to create the complete settings object. + * + *

For example, to set the total timeout of createReadSession to 30 seconds: + * + *

+ * 
+ * BigQueryReadSettings.Builder BigQueryReadSettingsBuilder =
+ *     BigQueryReadSettings.newBuilder();
+ * BigQueryReadSettingsBuilder.createReadSessionSettings().getRetrySettings().toBuilder()
+ *     .setTotalTimeout(Duration.ofSeconds(30));
+ * BigQueryReadSettings BigQueryReadSettings = BigQueryReadSettingsBuilder.build();
+ * 
+ * 
+ */ +@BetaApi +public class BigQueryReadSettings extends ClientSettings { + /** Returns the object with the settings used for calls to createReadSession. */ + public UnaryCallSettings createReadSessionSettings() { + return getTypedStubSettings().createReadSessionSettings(); + } + + /** Returns the object with the settings used for calls to readRows. */ + public ServerStreamingCallSettings readRowsSettings() { + return getTypedStubSettings().readRowsSettings(); + } + + /** Returns the object with the settings used for calls to splitReadStream. */ + public UnaryCallSettings + splitReadStreamSettings() { + return getTypedStubSettings().splitReadStreamSettings(); + } + + EnhancedBigQueryReadStubSettings getTypedStubSettings() { + return (EnhancedBigQueryReadStubSettings) getStubSettings(); + } + + public static final BigQueryReadSettings create(EnhancedBigQueryReadStubSettings stub) + throws IOException { + return new BigQueryReadSettings.Builder(stub.toBuilder()).build(); + } + + /** Returns a builder for the default ExecutorProvider for this service. */ + public static InstantiatingExecutorProvider.Builder defaultExecutorProviderBuilder() { + return EnhancedBigQueryReadStubSettings.defaultExecutorProviderBuilder(); + } + + /** Returns the default service endpoint. */ + public static String getDefaultEndpoint() { + return EnhancedBigQueryReadStubSettings.getDefaultEndpoint(); + } + + /** Returns the default service scopes. */ + public static List getDefaultServiceScopes() { + return EnhancedBigQueryReadStubSettings.getDefaultServiceScopes(); + } + + /** Returns a builder for the default credentials for this service. */ + public static GoogleCredentialsProvider.Builder defaultCredentialsProviderBuilder() { + return EnhancedBigQueryReadStubSettings.defaultCredentialsProviderBuilder(); + } + + /** Returns a builder for the default ChannelProvider for this service. */ + public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { + return EnhancedBigQueryReadStubSettings.defaultGrpcTransportProviderBuilder(); + } + + public static TransportChannelProvider defaultTransportChannelProvider() { + return EnhancedBigQueryReadStubSettings.defaultTransportChannelProvider(); + } + + @BetaApi("The surface for customizing headers is not stable yet and may change in the future.") + public static ApiClientHeaderProvider.Builder defaultApiClientHeaderProviderBuilder() { + return EnhancedBigQueryReadStubSettings.defaultApiClientHeaderProviderBuilder(); + } + + /** Returns a new builder for this class. */ + public static Builder newBuilder() { + return Builder.createDefault(); + } + + /** Returns a new builder for this class. */ + public static Builder newBuilder(ClientContext clientContext) { + return new Builder(clientContext); + } + + /** Returns a builder containing all the values of this settings class. */ + public Builder toBuilder() { + return new Builder(this); + } + + protected BigQueryReadSettings(Builder settingsBuilder) throws IOException { + super(settingsBuilder); + } + + /** Builder for BigQueryReadSettings. */ + public static class Builder extends ClientSettings.Builder { + protected Builder() throws IOException { + this((ClientContext) null); + } + + protected Builder(ClientContext clientContext) { + super(EnhancedBigQueryReadStubSettings.newBuilder(clientContext)); + } + + private static Builder createDefault() { + return new Builder(EnhancedBigQueryReadStubSettings.newBuilder()); + } + + protected Builder(BigQueryReadSettings settings) { + super(settings.getStubSettings().toBuilder()); + } + + protected Builder(EnhancedBigQueryReadStubSettings.Builder stubSettings) { + super(stubSettings); + } + + public EnhancedBigQueryReadStubSettings.Builder getStubSettingsBuilder() { + return ((EnhancedBigQueryReadStubSettings.Builder) getStubSettings()); + } + + // NEXT_MAJOR_VER: remove 'throws Exception' + /** + * Applies the given settings updater function to all of the unary API methods in this service. + * + *

Note: This method does not support applying settings to streaming methods. + */ + public Builder applyToAllUnaryMethods( + ApiFunction, Void> settingsUpdater) throws Exception { + super.applyToAllUnaryMethods( + getStubSettingsBuilder().unaryMethodSettingsBuilders(), settingsUpdater); + return this; + } + + /** Returns the builder for the settings used for calls to createReadSession. */ + public UnaryCallSettings.Builder + createReadSessionSettings() { + return getStubSettingsBuilder().createReadSessionSettings(); + } + + /** Returns the builder for the settings used for calls to readRows. */ + public ServerStreamingCallSettings.Builder + readRowsSettings() { + return getStubSettingsBuilder().readRowsSettings(); + } + + /** Returns the builder for the settings used for calls to splitReadStream. */ + public UnaryCallSettings.Builder + splitReadStreamSettings() { + return getStubSettingsBuilder().splitReadStreamSettings(); + } + + @Override + public BigQueryReadSettings build() throws IOException { + return new BigQueryReadSettings(this); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java new file mode 100644 index 0000000000..16b768d09a --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java @@ -0,0 +1,121 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub; + +import com.google.api.core.InternalApi; +import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Enhanced stub class for BigQuery Storage API. + * + *

This class is for advanced usage and reflects the underlying API directly. + */ +public class EnhancedBigQueryReadStub implements BackgroundResource { + private final GrpcBigQueryReadStub stub; + + public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) + throws IOException { + // Configure the base settings. + BigQueryReadStubSettings.Builder baseSettingsBuilder = + BigQueryReadStubSettings.newBuilder() + .setTransportChannelProvider(settings.getTransportChannelProvider()) + .setEndpoint(settings.getEndpoint()) + .setHeaderProvider(settings.getHeaderProvider()) + .setCredentialsProvider(settings.getCredentialsProvider()) + .setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval()) + .setStreamWatchdogProvider(settings.getStreamWatchdogProvider()); + + baseSettingsBuilder + .createReadSessionSettings() + .setRetryableCodes(settings.createReadSessionSettings().getRetryableCodes()) + .setRetrySettings(settings.createReadSessionSettings().getRetrySettings()); + + baseSettingsBuilder + .readRowsSettings() + .setRetryableCodes(settings.readRowsSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowsSettings().getRetrySettings()) + .setResumptionStrategy(settings.readRowsSettings().getResumptionStrategy()) + .setIdleTimeout(settings.readRowsSettings().getIdleTimeout()); + + baseSettingsBuilder + .splitReadStreamSettings() + .setRetryableCodes(settings.splitReadStreamSettings().getRetryableCodes()) + .setRetrySettings(settings.splitReadStreamSettings().getRetrySettings()); + + BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); + ClientContext clientContext = ClientContext.create(baseSettings); + GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); + return new EnhancedBigQueryReadStub(stub); + } + + @InternalApi("Visible for testing") + EnhancedBigQueryReadStub(GrpcBigQueryReadStub stub) { + this.stub = stub; + } + + public UnaryCallable createReadSessionCallable() { + return stub.createReadSessionCallable(); + } + + public ServerStreamingCallable readRowsCallable() { + return stub.readRowsCallable(); + } + + public UnaryCallable splitReadStreamCallable() { + return stub.splitReadStreamCallable(); + } + + @Override + public void close() { + stub.close(); + } + + @Override + public void shutdown() { + stub.shutdown(); + } + + @Override + public boolean isShutdown() { + return stub.isShutdown(); + } + + @Override + public boolean isTerminated() { + return stub.isTerminated(); + } + + @Override + public void shutdownNow() { + stub.shutdownNow(); + } + + @Override + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return stub.awaitTermination(duration, unit); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettings.java new file mode 100644 index 0000000000..190f355779 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettings.java @@ -0,0 +1,232 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub; + +import com.google.api.core.ApiFunction; +import com.google.api.core.BetaApi; +import com.google.api.gax.core.GoogleCredentialsProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.StubSettings; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.cloud.bigquery.storage.v1.BaseBigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; +import com.google.cloud.bigquery.storage.v1.stub.readrows.ReadRowsResumptionStrategy; +import com.google.common.collect.ImmutableList; +import java.util.List; + +/** + * Settings class to configure an instance of {@link EnhancedBigQueryReadStub}. + * + *

The default instance dynamically reads and applies the default values used by {@link + * BigQueryReadStub}. + * + *

The builder of this class is recursive, so contained classes are themselves builders. When + * build() is called, the tree of builders is called to create the complete settings object. For + * example, to set the total timeout of createReadSession to 30 seconds: + * + *

+ * 
+ * EnhancedBigQueryReadStubSettings.Builder builder =
+ *     EnhancedBigQueryReadStubSettings.newBuilder();
+ * builder.createReadSessionSettings().getRetrySettings().toBuilder()
+ *     .setTotalTimeout(Duration.ofSeconds(30));
+ * EnhancedBigQueryReadStubSettings settings = builder.build();
+ * 
+ * 
+ */ +public class EnhancedBigQueryReadStubSettings + extends StubSettings { + + private final UnaryCallSettings createReadSessionSettings; + private final ServerStreamingCallSettings readRowsSettings; + private final UnaryCallSettings + splitReadStreamSettings; + + /** Returns the object with the settings used for calls to createReadSession. */ + public UnaryCallSettings createReadSessionSettings() { + return createReadSessionSettings; + } + + /** Returns the object with the settings used for calls to readRows. */ + public ServerStreamingCallSettings readRowsSettings() { + return readRowsSettings; + } + + /** Returns the object with the settings used for calls to splitReadStream. */ + public UnaryCallSettings + splitReadStreamSettings() { + return splitReadStreamSettings; + } + + /** Returns a builder for the default ExecutorProvider for this service. */ + public static InstantiatingExecutorProvider.Builder defaultExecutorProviderBuilder() { + return BigQueryReadStubSettings.defaultExecutorProviderBuilder(); + } + + /** Returns the default service endpoint. */ + public static String getDefaultEndpoint() { + return BigQueryReadStubSettings.getDefaultEndpoint(); + } + + /** Returns the default service scopes. */ + public static List getDefaultServiceScopes() { + return BigQueryReadStubSettings.getDefaultServiceScopes(); + } + + /** Returns a builder for the default credentials for this service. */ + public static GoogleCredentialsProvider.Builder defaultCredentialsProviderBuilder() { + return BaseBigQueryReadSettings.defaultCredentialsProviderBuilder(); + } + + /** Returns a builder for the default ChannelProvider for this service. */ + public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { + return BigQueryReadStubSettings.defaultGrpcTransportProviderBuilder(); + } + + public static TransportChannelProvider defaultTransportChannelProvider() { + return defaultGrpcTransportProviderBuilder().build(); + } + + @BetaApi("The surface for customizing headers is not stable yet and may change in the future.") + public static ApiClientHeaderProvider.Builder defaultApiClientHeaderProviderBuilder() { + return BigQueryReadStubSettings.defaultApiClientHeaderProviderBuilder(); + } + + /** Returns a new builder for this class. */ + public static Builder newBuilder() { + return new Builder(); + } + + /** Returns a new builder for this class. */ + public static Builder newBuilder(ClientContext clientContext) { + return new Builder(clientContext); + } + + /** Returns a builder containing all the values of this settings class. */ + public Builder toBuilder() { + return new Builder(this); + } + + protected EnhancedBigQueryReadStubSettings(Builder settingsBuilder) { + super(settingsBuilder); + + createReadSessionSettings = settingsBuilder.createReadSessionSettings().build(); + readRowsSettings = settingsBuilder.readRowsSettings().build(); + splitReadStreamSettings = settingsBuilder.splitReadStreamSettings().build(); + } + + /** Builder for {@link EnhancedBigQueryReadStubSettings}. */ + public static class Builder + extends StubSettings.Builder { + private final ImmutableList> unaryMethodSettingsBuilders; + + private final UnaryCallSettings.Builder + createReadSessionSettings; + private final ServerStreamingCallSettings.Builder + readRowsSettings; + private final UnaryCallSettings.Builder + splitReadStreamSettings; + + protected Builder() { + this((ClientContext) null); + } + + protected Builder(ClientContext clientContext) { + super(clientContext); + + // Defaults provider + BigQueryReadStubSettings.Builder baseDefaults = BigQueryReadStubSettings.newBuilder(); + setEndpoint(baseDefaults.getEndpoint()); + setTransportChannelProvider(defaultTransportChannelProvider()); + setCredentialsProvider(baseDefaults.getCredentialsProvider()); + setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval()); + setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider()); + + // Per-method settings using baseSettings for defaults. + createReadSessionSettings = baseDefaults.createReadSessionSettings(); + splitReadStreamSettings = baseDefaults.splitReadStreamSettings(); + + // Per-method settings using override values for defaults. + readRowsSettings = + baseDefaults.readRowsSettings().setResumptionStrategy(new ReadRowsResumptionStrategy()); + + unaryMethodSettingsBuilders = + ImmutableList.>of( + createReadSessionSettings, splitReadStreamSettings); + } + + protected Builder(EnhancedBigQueryReadStubSettings settings) { + super(settings); + + createReadSessionSettings = settings.createReadSessionSettings.toBuilder(); + readRowsSettings = settings.readRowsSettings.toBuilder(); + splitReadStreamSettings = settings.splitReadStreamSettings.toBuilder(); + + unaryMethodSettingsBuilders = + ImmutableList.>of( + createReadSessionSettings, splitReadStreamSettings); + } + + // NEXT_MAJOR_VER: remove 'throws Exception' + /** + * Applies the given settings updater function to all of the unary API methods in this service. + * + *

Note: This method does not support applying settings to streaming methods. + */ + public Builder applyToAllUnaryMethods( + ApiFunction, Void> settingsUpdater) throws Exception { + super.applyToAllUnaryMethods(unaryMethodSettingsBuilders, settingsUpdater); + return this; + } + + public ImmutableList> unaryMethodSettingsBuilders() { + return unaryMethodSettingsBuilders; + } + + /** Returns the builder for the settings used for calls to createReadSession. */ + public UnaryCallSettings.Builder + createReadSessionSettings() { + return createReadSessionSettings; + } + + /** Returns the builder for the settings used for calls to readRows. */ + public ServerStreamingCallSettings.Builder + readRowsSettings() { + return readRowsSettings; + } + + /** Returns the builder for the settings used for calls to splitReadStream. */ + public UnaryCallSettings.Builder + splitReadStreamSettings() { + return splitReadStreamSettings; + } + + @Override + public EnhancedBigQueryReadStubSettings build() { + return new EnhancedBigQueryReadStubSettings(this); + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsResumptionStrategy.java new file mode 100644 index 0000000000..e14d58b58d --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsResumptionStrategy.java @@ -0,0 +1,72 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import javax.annotation.Nonnull; + +/** + * An implementation of a {@link StreamResumptionStrategy} for the ReadRows API. This class tracks + * the offset of the last row received and, upon retry, attempts to resume the stream at the next + * offset. + * + *

This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public class ReadRowsResumptionStrategy + implements StreamResumptionStrategy { + + // Number of rows processed. + private long rowsProcessed = 0; + + @Override + @Nonnull + public StreamResumptionStrategy createNew() { + return new ReadRowsResumptionStrategy(); + } + + @Override + @Nonnull + public ReadRowsResponse processResponse(ReadRowsResponse response) { + rowsProcessed += response.getRowCount(); + return response; + } + + /** + * {@inheritDoc} + * + *

Given the initial/original request, this implementation generates a request that will yield + * a new stream whose first response would come right after the last response received by + * processResponse. It takes into account the offset from the original request. + */ + @Override + public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) { + ReadRowsRequest.Builder resumeRequestBuilder = originalRequest.toBuilder(); + + resumeRequestBuilder.setOffset(originalRequest.getOffset() + rowsProcessed); + + return resumeRequestBuilder.build(); + } + + @Override + public boolean canResume() { + return true; + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java new file mode 100644 index 0000000000..111054d34f --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/package-info.java @@ -0,0 +1,16 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub.readrows; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java new file mode 100644 index 0000000000..01f9101a7e --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java @@ -0,0 +1,165 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.grpc.testing.MockStreamObserver; +import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode; +import com.google.protobuf.AbstractMessage; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class BigQueryReadClientTest { + private static MockBigQueryRead mockBigQueryRead; + private static MockServiceHelper serviceHelper; + private BigQueryReadClient client; + private LocalChannelProvider channelProvider; + + @BeforeClass + public static void startStaticServer() { + mockBigQueryRead = new MockBigQueryRead(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(mockBigQueryRead)); + serviceHelper.start(); + } + + @AfterClass + public static void stopServer() { + serviceHelper.stop(); + } + + @Before + public void setUp() throws IOException { + serviceHelper.reset(); + channelProvider = serviceHelper.createChannelProvider(); + BigQueryReadSettings settings = + BigQueryReadSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build(); + client = BigQueryReadClient.create(settings); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + @Test + @SuppressWarnings("all") + public void createReadSessionTest() { + String name = "name3373707"; + String table = "table110115790"; + ReadSession expectedResponse = ReadSession.newBuilder().setName(name).setTable(table).build(); + mockBigQueryRead.addResponse(expectedResponse); + + String parent = "parent-995424086"; + ReadSession readSession = ReadSession.newBuilder().build(); + int maxStreamCount = 940837515; + + ReadSession actualResponse = client.createReadSession(parent, readSession, maxStreamCount); + Assert.assertEquals(expectedResponse, actualResponse); + + List actualRequests = mockBigQueryRead.getRequests(); + Assert.assertEquals(1, actualRequests.size()); + CreateReadSessionRequest actualRequest = (CreateReadSessionRequest) actualRequests.get(0); + + Assert.assertEquals(parent, actualRequest.getParent()); + Assert.assertEquals(readSession, actualRequest.getReadSession()); + Assert.assertEquals(maxStreamCount, actualRequest.getMaxStreamCount()); + Assert.assertTrue( + channelProvider.isHeaderSent( + ApiClientHeaderProvider.getDefaultApiClientHeaderKey(), + GaxGrpcProperties.getDefaultApiClientHeaderPattern())); + } + + @Test + @SuppressWarnings("all") + public void createReadSessionExceptionTest() throws Exception { + StatusRuntimeException exception = new StatusRuntimeException(Status.INVALID_ARGUMENT); + mockBigQueryRead.addException(exception); + + try { + String parent = "parent-995424086"; + ReadSession readSession = ReadSession.newBuilder().build(); + int maxStreamCount = 940837515; + + client.createReadSession(parent, readSession, maxStreamCount); + Assert.fail("No exception raised"); + } catch (InvalidArgumentException e) { + // Expected exception + } + } + + @Test + @SuppressWarnings("all") + public void readRowsTest() throws Exception { + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + Assert.assertEquals(expectedResponse, actualResponses.get(0)); + } + + @Test + @SuppressWarnings("all") + public void readRowsExceptionTest() throws Exception { + StatusRuntimeException exception = new StatusRuntimeException(Status.INVALID_ARGUMENT); + mockBigQueryRead.addException(exception); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof InvalidArgumentException); + InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); + Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettingsTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettingsTest.java new file mode 100644 index 0000000000..d86067b1bc --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStubSettingsTest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.api.gax.rpc.WatchdogProvider; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; +import java.util.Set; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class EnhancedBigQueryReadStubSettingsTest { + + @Test + public void testSettingsArePreserved() { + String endpoint = "some.other.host:123"; + CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); + Duration watchdogInterval = Duration.ofSeconds(12); + WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class); + + EnhancedBigQueryReadStubSettings.Builder builder = + EnhancedBigQueryReadStubSettings.newBuilder() + .setEndpoint(endpoint) + .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogCheckInterval(watchdogInterval) + .setStreamWatchdogProvider(watchdogProvider); + + verifyBuilder(builder, endpoint, credentialsProvider, watchdogInterval, watchdogProvider); + + verifySettings( + builder.build(), endpoint, credentialsProvider, watchdogInterval, watchdogProvider); + + verifyBuilder( + builder.build().toBuilder(), + endpoint, + credentialsProvider, + watchdogInterval, + watchdogProvider); + } + + private void verifyBuilder( + EnhancedBigQueryReadStubSettings.Builder builder, + String endpoint, + CredentialsProvider credentialsProvider, + Duration watchdogInterval, + WatchdogProvider watchdogProvider) { + assertThat(builder.getEndpoint()).isEqualTo(endpoint); + assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider); + assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(builder.getStreamWatchdogProvider()).isEqualTo(watchdogProvider); + + InstantiatingGrpcChannelProvider channelProvider = + (InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider(); + assertThat(channelProvider.toBuilder().getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); + } + + private void verifySettings( + EnhancedBigQueryReadStubSettings settings, + String endpoint, + CredentialsProvider credentialsProvider, + Duration watchdogInterval, + WatchdogProvider watchdogProvider) { + assertThat(settings.getEndpoint()).isEqualTo(endpoint); + assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider); + assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(settings.getStreamWatchdogProvider()).isEqualTo(watchdogProvider); + + InstantiatingGrpcChannelProvider channelProvider = + (InstantiatingGrpcChannelProvider) settings.getTransportChannelProvider(); + assertThat(channelProvider.toBuilder().getMaxInboundMessageSize()).isEqualTo(Integer.MAX_VALUE); + } + + @Test + public void testCreateReadSessionSettings() { + UnaryCallSettings.Builder builder = + EnhancedBigQueryReadStubSettings.newBuilder().createReadSessionSettings(); + verifyRetrySettings(builder.getRetryableCodes(), builder.getRetrySettings()); + } + + @Test + public void testReadRowsSettings() { + ServerStreamingCallSettings.Builder builder = + EnhancedBigQueryReadStubSettings.newBuilder().readRowsSettings(); + assertThat(builder.getRetryableCodes()).contains(Code.UNAVAILABLE); + RetrySettings retrySettings = builder.getRetrySettings(); + assertThat(retrySettings.getInitialRetryDelay()).isEqualTo(Duration.ofMillis(100L)); + assertThat(retrySettings.getRetryDelayMultiplier()).isWithin(1e-6).of(1.3); + assertThat(retrySettings.getMaxRetryDelay()).isEqualTo(Duration.ofMinutes(1L)); + assertThat(retrySettings.getInitialRpcTimeout()).isEqualTo(Duration.ofDays(1L)); + assertThat(retrySettings.getRpcTimeoutMultiplier()).isWithin(1e-6).of(1.0); + assertThat(retrySettings.getMaxRpcTimeout()).isEqualTo(Duration.ofDays(1L)); + assertThat(retrySettings.getTotalTimeout()).isEqualTo(Duration.ofDays(1L)); + assertThat(builder.getIdleTimeout()).isEqualTo(Duration.ZERO); + } + + @Test + public void testSplitReadStreamSettings() { + UnaryCallSettings.Builder builder = + EnhancedBigQueryReadStubSettings.newBuilder().splitReadStreamSettings(); + verifyRetrySettings(builder.getRetryableCodes(), builder.getRetrySettings()); + } + + private void verifyRetrySettings(Set retryCodes, RetrySettings retrySettings) { + assertThat(retryCodes).contains(Code.UNAVAILABLE); + assertThat(retrySettings.getTotalTimeout()).isGreaterThan(Duration.ZERO); + assertThat(retrySettings.getInitialRetryDelay()).isGreaterThan(Duration.ZERO); + assertThat(retrySettings.getRetryDelayMultiplier()).isAtLeast(1.0); + assertThat(retrySettings.getMaxRetryDelay()).isGreaterThan(Duration.ZERO); + assertThat(retrySettings.getInitialRpcTimeout()).isGreaterThan(Duration.ZERO); + assertThat(retrySettings.getRpcTimeoutMultiplier()).isAtLeast(1.0); + assertThat(retrySettings.getMaxRpcTimeout()).isGreaterThan(Duration.ZERO); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java new file mode 100644 index 0000000000..82e533dc05 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/ResourceHeaderTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub; + +import static com.google.common.truth.Truth.assertWithMessage; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.InProcessServer; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.api.gax.rpc.UnimplementedException; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc.BigQueryReadImplBase; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; +import java.util.regex.Pattern; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ResourceHeaderTest { + + private static final String TEST_TABLE_REFERENCE = + "projects/project/datasets/dataset/tables/table"; + + private static final String TEST_STREAM_NAME = "streamName"; + + private static final String NAME = "resource-header-test:123"; + + private static final String HEADER_NAME = "x-goog-request-params"; + + private static final Pattern READ_SESSION_NAME_PATTERN = + Pattern.compile( + ".*" + "read_session\\.table=projects/project/datasets/dataset/tables/table" + ".*"); + private static final Pattern READ_STREAM_PATTERN = + Pattern.compile(".*" + "read_stream=streamName" + ".*"); + private static final Pattern STREAM_NAME_PATTERN = + Pattern.compile(".*" + "name=streamName" + ".*"); + + private static final String TEST_HEADER_NAME = "simple-header-name"; + private static final String TEST_HEADER_VALUE = "simple-header-value"; + private static final Pattern TEST_PATTERN = Pattern.compile(".*" + TEST_HEADER_VALUE + ".*"); + + private static InProcessServer server; + + private LocalChannelProvider channelProvider; + private BigQueryReadClient client; + + @BeforeClass + public static void setUpClass() throws Exception { + server = new InProcessServer<>(new BigQueryReadImplBase() {}, NAME); + server.start(); + } + + @Before + public void setUp() throws Exception { + channelProvider = LocalChannelProvider.create(NAME); + BigQueryReadSettings.Builder settingsBuilder = + BigQueryReadSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setHeaderProvider(FixedHeaderProvider.create(TEST_HEADER_NAME, TEST_HEADER_VALUE)) + .setTransportChannelProvider(channelProvider); + client = BigQueryReadClient.create(settingsBuilder.build()); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + server.stop(); + server.blockUntilShutdown(); + } + + @Test + public void createReadSessionTest() { + try { + client.createReadSession( + "parents/project", ReadSession.newBuilder().setTable(TEST_TABLE_REFERENCE).build(), 1); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + verifyHeaderSent(READ_SESSION_NAME_PATTERN); + } + + @Test + public void readRowsTest() { + try { + ReadRowsRequest request = + ReadRowsRequest.newBuilder().setReadStream(TEST_STREAM_NAME).setOffset(125).build(); + client.readRowsCallable().call(request); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + + verifyHeaderSent(READ_STREAM_PATTERN); + } + + @Test + public void splitReadStreamTest() { + try { + client.splitReadStream(SplitReadStreamRequest.newBuilder().setName(TEST_STREAM_NAME).build()); + } catch (UnimplementedException e) { + // Ignore the error: none of the methods are actually implemented. + } + + verifyHeaderSent(STREAM_NAME_PATTERN); + } + + private void verifyHeaderSent(Pattern... patterns) { + for (Pattern pattern : patterns) { + boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, pattern); + assertWithMessage("Generated header was sent").that(headerSent).isTrue(); + } + boolean testHeaderSent = channelProvider.isHeaderSent(TEST_HEADER_NAME, TEST_PATTERN); + assertWithMessage("Provided header was sent").that(testHeaderSent).isTrue(); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java new file mode 100644 index 0000000000..2a2e513bec --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ReadRowsRetryTest.java @@ -0,0 +1,243 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1.stub.readrows; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.ServerStream; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc.BigQueryReadImplBase; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.common.collect.Queues; +import io.grpc.Status.Code; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ReadRowsRetryTest { + + @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + + private TestBigQueryStorageService service; + private BigQueryReadClient client; + + @Before + public void setUp() throws IOException { + service = new TestBigQueryStorageService(); + serverRule.getServiceRegistry().addService(service); + + BigQueryReadSettings settings = + BigQueryReadSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(serverRule.getChannel()))) + .build(); + + client = BigQueryReadClient.create(settings); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + @Test + public void happyPathTest() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7)); + + Assert.assertEquals(17, getRowCount(request)); + } + + @Test + public void immediateRetryTest() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7)); + + Assert.assertEquals(17, getRowCount(request)); + } + + @Test + public void multipleRetryTestWithZeroInitialOffset() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithNumberOfRows(5) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 5) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create().expectRequest("fake-stream", 22).respondWithNumberOfRows(6)); + + Assert.assertEquals(28, getRowCount(request)); + } + + @Test + public void multipleRetryTestWithNonZeroInitialOffset() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 17); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 17) + .respondWithNumberOfRows(5) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 22) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create().expectRequest("fake-stream", 39).respondWithNumberOfRows(3)); + + Assert.assertEquals(25, getRowCount(request)); + } + + @Test + public void errorAtTheVeryEndTest() { + ReadRowsRequest request = RpcExpectation.createRequest("fake-stream", 0); + service.expectations.add( + RpcExpectation.create() + .expectRequest("fake-stream", 0) + .respondWithNumberOfRows(10) + .respondWithNumberOfRows(7) + .respondWithStatus(Code.UNAVAILABLE)); + + service.expectations.add( + RpcExpectation.create().expectRequest("fake-stream", 17).respondWithNumberOfRows(0)); + + Assert.assertEquals(17, getRowCount(request)); + } + + private int getRowCount(ReadRowsRequest request) { + ServerStream serverStream = client.readRowsCallable().call(request); + int rowCount = 0; + for (ReadRowsResponse readRowsResponse : serverStream) { + rowCount += readRowsResponse.getRowCount(); + } + return rowCount; + } + + private static class TestBigQueryStorageService extends BigQueryReadImplBase { + + Queue expectations = Queues.newArrayDeque(); + int currentRequestIndex = -1; + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + + RpcExpectation expectedRpc = expectations.poll(); + currentRequestIndex++; + + Assert.assertNotNull( + "Unexpected request #" + currentRequestIndex + ": " + request.toString(), expectedRpc); + + Assert.assertEquals( + "Expected request #" + + currentRequestIndex + + " does not match actual request: " + + request.toString(), + expectedRpc.expectedRequest, + request); + + for (ReadRowsResponse response : expectedRpc.responses) { + responseObserver.onNext(response); + } + + if (expectedRpc.statusCode.toStatus().isOk()) { + responseObserver.onCompleted(); + } else { + responseObserver.onError(expectedRpc.statusCode.toStatus().asRuntimeException()); + } + } + } + + private static class RpcExpectation { + + ReadRowsRequest expectedRequest; + Code statusCode; + List responses; + + private RpcExpectation() { + statusCode = Code.OK; + responses = new ArrayList<>(); + } + + static RpcExpectation create() { + return new RpcExpectation(); + } + + static ReadRowsRequest createRequest(String streamName, long offset) { + return ReadRowsRequest.newBuilder().setReadStream(streamName).setOffset(offset).build(); + } + + static ReadRowsResponse createResponse(int numberOfRows) { + return ReadRowsResponse.newBuilder().setRowCount(numberOfRows).build(); + } + + RpcExpectation expectRequest(String streamName, long offset) { + expectedRequest = createRequest(streamName, offset); + return this; + } + + RpcExpectation respondWithNumberOfRows(int numberOfRows) { + responses.add(createResponse(numberOfRows)); + return this; + } + + RpcExpectation respondWithStatus(Code code) { + this.statusCode = code; + return this; + } + } +} From 90ce4398ad9387485de9f2316738a58e6ffdc334 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Fri, 7 Feb 2020 13:28:05 -0500 Subject: [PATCH 2/4] chore: release 0.123.1-SNAPSHOT (#67) * updated versions.txt [ci skip] * updated google-cloud-bigquerystorage-bom/pom.xml [ci skip] * updated google-cloud-bigquerystorage/pom.xml [ci skip] * updated grpc-google-cloud-bigquerystorage-v1/pom.xml [ci skip] * updated grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml [ci skip] * updated grpc-google-cloud-bigquerystorage-v1beta1/pom.xml [ci skip] * updated grpc-google-cloud-bigquerystorage-v1beta2/pom.xml [ci skip] * updated pom.xml [ci skip] * updated proto-google-cloud-bigquerystorage-v1/pom.xml [ci skip] * updated proto-google-cloud-bigquerystorage-v1alpha2/pom.xml [ci skip] * updated proto-google-cloud-bigquerystorage-v1beta1/pom.xml [ci skip] * updated proto-google-cloud-bigquerystorage-v1beta2/pom.xml [ci skip] --- google-cloud-bigquerystorage-bom/pom.xml | 20 +++++++++---------- google-cloud-bigquerystorage/pom.xml | 4 ++-- grpc-google-cloud-bigquerystorage-v1/pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- pom.xml | 20 +++++++++---------- proto-google-cloud-bigquerystorage-v1/pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- versions.txt | 18 ++++++++--------- 12 files changed, 47 insertions(+), 47 deletions(-) diff --git a/google-cloud-bigquerystorage-bom/pom.xml b/google-cloud-bigquerystorage-bom/pom.xml index 01cca2de45..00135e684a 100644 --- a/google-cloud-bigquerystorage-bom/pom.xml +++ b/google-cloud-bigquerystorage-bom/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-bigquerystorage-bom - 0.123.0-beta + 0.123.1-beta-SNAPSHOT pom com.google.cloud @@ -63,48 +63,48 @@ com.google.api.grpc proto-google-cloud-bigquerystorage-v1alpha2 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc grpc-google-cloud-bigquerystorage-v1alpha2 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 0.88.0 + 0.88.1-SNAPSHOT com.google.cloud google-cloud-bigquerystorage - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 2e27f38ced..6da3d004f5 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-bigquerystorage - 0.123.0-beta + 0.123.1-beta-SNAPSHOT jar BigQuery Storage https://github.com/googleapis/java-bigquerystorage @@ -11,7 +11,7 @@ com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT google-cloud-bigquerystorage diff --git a/grpc-google-cloud-bigquerystorage-v1/pom.xml b/grpc-google-cloud-bigquerystorage-v1/pom.xml index e1c552cdcf..515f776626 100644 --- a/grpc-google-cloud-bigquerystorage-v1/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 0.88.0 + 0.88.1-SNAPSHOT grpc-google-cloud-bigquerystorage-v1 GRPC library for grpc-google-cloud-bigquerystorage-v1 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml b/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml index e1b7003fa3..af828b8a88 100644 --- a/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1alpha2 - 0.88.0 + 0.88.1-SNAPSHOT grpc-google-cloud-bigquerystorage-v1alpha2 GRPC library for grpc-google-cloud-bigquerystorage-v1alpha2 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml index c0bae6af40..a87997ed7a 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.88.0 + 0.88.1-SNAPSHOT grpc-google-cloud-bigquerystorage-v1beta1 GRPC library for grpc-google-cloud-bigquerystorage-v1beta1 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml index adc72bd957..b8e553ed9d 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.88.0 + 0.88.1-SNAPSHOT grpc-google-cloud-bigquerystorage-v1beta2 GRPC library for grpc-google-cloud-bigquerystorage-v1beta2 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/pom.xml b/pom.xml index 57d80a76d5..0207db9bf1 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-bigquerystorage-parent pom - 0.123.0-beta + 0.123.1-beta-SNAPSHOT BigQuery Storage Parent https://github.com/googleapis/java-bigquerystorage @@ -83,43 +83,43 @@ com.google.api.grpc proto-google-cloud-bigquerystorage-v1alpha2 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc grpc-google-cloud-bigquerystorage-v1alpha2 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.88.0 + 0.88.1-SNAPSHOT com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 0.88.0 + 0.88.1-SNAPSHOT com.google.auto.value @@ -134,7 +134,7 @@ com.google.cloud google-cloud-bigquerystorage - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/proto-google-cloud-bigquerystorage-v1/pom.xml b/proto-google-cloud-bigquerystorage-v1/pom.xml index 05211b82f9..4c7c9658ee 100644 --- a/proto-google-cloud-bigquerystorage-v1/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 0.88.0 + 0.88.1-SNAPSHOT proto-google-cloud-bigquerystorage-v1 PROTO library for proto-google-cloud-bigquerystorage-v1 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/proto-google-cloud-bigquerystorage-v1alpha2/pom.xml b/proto-google-cloud-bigquerystorage-v1alpha2/pom.xml index 685bda6c8c..520bbc9a93 100644 --- a/proto-google-cloud-bigquerystorage-v1alpha2/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1alpha2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1alpha2 - 0.88.0 + 0.88.1-SNAPSHOT proto-google-cloud-bigquerystorage-v1alpha2 PROTO library for proto-google-cloud-bigquerystorage-v1alpha2 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml index 53cfaa4f92..6d2bf288f3 100644 --- a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.88.0 + 0.88.1-SNAPSHOT proto-google-cloud-bigquerystorage-v1beta1 PROTO library for proto-google-cloud-bigquerystorage-v1beta1 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml index b368b833e9..68381cc61d 100644 --- a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.88.0 + 0.88.1-SNAPSHOT proto-google-cloud-bigquerystorage-v1beta2 PROTO library for proto-google-cloud-bigquerystorage-v1beta2 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.0-beta + 0.123.1-beta-SNAPSHOT diff --git a/versions.txt b/versions.txt index d8d6776580..9bd37c5f8b 100644 --- a/versions.txt +++ b/versions.txt @@ -1,12 +1,12 @@ # Format: # module:released-version:current-version -proto-google-cloud-bigquerystorage-v1alpha2:0.88.0:0.88.0 -proto-google-cloud-bigquerystorage-v1beta1:0.88.0:0.88.0 -proto-google-cloud-bigquerystorage-v1beta2:0.88.0:0.88.0 -proto-google-cloud-bigquerystorage-v1:0.88.0:0.88.0 -grpc-google-cloud-bigquerystorage-v1alpha2:0.88.0:0.88.0 -grpc-google-cloud-bigquerystorage-v1beta1:0.88.0:0.88.0 -grpc-google-cloud-bigquerystorage-v1beta2:0.88.0:0.88.0 -grpc-google-cloud-bigquerystorage-v1:0.88.0:0.88.0 -google-cloud-bigquerystorage:0.123.0-beta:0.123.0-beta +proto-google-cloud-bigquerystorage-v1alpha2:0.88.0:0.88.1-SNAPSHOT +proto-google-cloud-bigquerystorage-v1beta1:0.88.0:0.88.1-SNAPSHOT +proto-google-cloud-bigquerystorage-v1beta2:0.88.0:0.88.1-SNAPSHOT +proto-google-cloud-bigquerystorage-v1:0.88.0:0.88.1-SNAPSHOT +grpc-google-cloud-bigquerystorage-v1alpha2:0.88.0:0.88.1-SNAPSHOT +grpc-google-cloud-bigquerystorage-v1beta1:0.88.0:0.88.1-SNAPSHOT +grpc-google-cloud-bigquerystorage-v1beta2:0.88.0:0.88.1-SNAPSHOT +grpc-google-cloud-bigquerystorage-v1:0.88.0:0.88.1-SNAPSHOT +google-cloud-bigquerystorage:0.123.0-beta:0.123.1-beta-SNAPSHOT From 8e7ac1511b9f9eaea417e6761848e4735039a831 Mon Sep 17 00:00:00 2001 From: Martin Mladenovski Date: Fri, 7 Feb 2020 12:07:26 -0800 Subject: [PATCH 3/4] feat: add v1 integration tests (#68) * feat: seed v1 integration tests with a copy from v1beta2 Upcoming commit will update the version and will make it easier to review. * feat: add v1 integration tests --- .../storage/v1/it/BigQueryResource.java | 34 + .../it/ITBigQueryStorageLongRunningTest.java | 147 +++ .../storage/v1/it/ITBigQueryStorageTest.java | 965 ++++++++++++++++++ .../storage/v1/it/SimpleRowReader.java | 77 ++ 4 files changed, 1223 insertions(+) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReader.java diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java new file mode 100644 index 0000000000..b42ff26e63 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +/** Test helper class to generate BigQuery resource paths. */ +public class BigQueryResource { + + /** + * Returns a BigQuery table resource path from the provided parameters into the following format: + * projects/{projectId}/datasets/{datasetId}/tables/{tableId} + * + * @param projectId + * @param datasetId + * @param tableId + * @return a path to a table resource. + */ + public static String FormatTableResource(String projectId, String datasetId, String tableId) { + return String.format("projects/%s/datasets/%s/tables/%s", projectId, datasetId, tableId); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java new file mode 100644 index 0000000000..f376565947 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java @@ -0,0 +1,147 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import static org.junit.Assert.assertEquals; + +import com.google.api.gax.rpc.ServerStream; +import com.google.cloud.ServiceOptions; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.logging.Logger; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Integration tests for BigQuery Storage API which target long running sessions. These tests can be + * enabled by setting the system property 'bigquery.storage.enable_long_running_tests' to true. + */ +public class ITBigQueryStorageLongRunningTest { + + private static final Logger LOG = + Logger.getLogger(ITBigQueryStorageLongRunningTest.class.getName()); + + private static final String LONG_TESTS_ENABLED_PROPERTY = + "bigquery.storage.enable_long_running_tests"; + + private static final String LONG_TESTS_DISABLED_MESSAGE = + String.format( + "BigQuery Storage long running tests are not enabled and will be skipped. " + + "To enable them, set system property '%s' to true.", + LONG_TESTS_ENABLED_PROPERTY); + + private static BigQueryReadClient client; + private static String parentProjectId; + + @BeforeClass + public static void beforeClass() throws IOException { + Assume.assumeTrue(LONG_TESTS_DISABLED_MESSAGE, Boolean.getBoolean(LONG_TESTS_ENABLED_PROPERTY)); + client = BigQueryReadClient.create(); + parentProjectId = String.format("projects/%s", ServiceOptions.getDefaultProjectId()); + + LOG.info( + String.format( + "%s tests running with parent project: %s", + ITBigQueryStorageLongRunningTest.class.getSimpleName(), parentProjectId)); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + } + + @Test + public void testLongRunningReadSession() throws InterruptedException, ExecutionException { + // This test reads a larger table with the goal of doing a simple validation of timeout settings + // for a longer running session. + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ "bigquery-public-data", + /* datasetId = */ "samples", + /* tableId = */ "wikipedia"); + + ReadSession session = + client.createReadSession( + /* parent = */ parentProjectId, + /* readSession = */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.AVRO) + .build(), + /* maxStreamCount = */ 5); + + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 5, + session.getStreamsCount()); + + List> tasks = new ArrayList<>(session.getStreamsCount()); + for (final ReadStream stream : session.getStreamsList()) { + tasks.add( + new Callable() { + @Override + public Long call() throws Exception { + return readAllRowsFromStream(stream); + } + }); + } + + ExecutorService executor = Executors.newFixedThreadPool(tasks.size()); + List> results = executor.invokeAll(tasks); + + long rowCount = 0; + for (Future result : results) { + rowCount += result.get(); + } + + assertEquals(313_797_035, rowCount); + } + + private long readAllRowsFromStream(ReadStream readStream) { + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build(); + + long rowCount = 0; + ServerStream serverStream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : serverStream) { + rowCount += response.getRowCount(); + } + + LOG.info( + String.format("Read total of %d rows from stream '%s'.", rowCount, readStream.getName())); + return rowCount; + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java new file mode 100644 index 0000000000..b80c2c3e86 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java @@ -0,0 +1,965 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import com.google.api.gax.rpc.ServerStream; +import com.google.cloud.RetryOption; +import com.google.cloud.ServiceOptions; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Field.Mode; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobInfo.WriteDisposition; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TimePartitioning; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; +import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions; +import com.google.cloud.bigquery.storage.v1.ReadStream; +import com.google.cloud.bigquery.storage.v1.it.SimpleRowReader.AvroRowConsumer; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.common.base.Preconditions; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.logging.Logger; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.util.Utf8; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.threeten.bp.Duration; +import org.threeten.bp.Instant; +import org.threeten.bp.LocalDate; +import org.threeten.bp.LocalTime; +import org.threeten.bp.ZoneOffset; +import org.threeten.bp.ZonedDateTime; +import org.threeten.bp.format.DateTimeFormatter; + +/** Integration tests for BigQuery Storage API. */ +public class ITBigQueryStorageTest { + + private static final Logger LOG = Logger.getLogger(ITBigQueryStorageTest.class.getName()); + private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String DESCRIPTION = "BigQuery Storage Java client test dataset"; + + private static BigQueryReadClient client; + private static String parentProjectId; + private static BigQuery bigquery; + + @BeforeClass + public static void beforeClass() throws IOException { + client = BigQueryReadClient.create(); + parentProjectId = String.format("projects/%s", ServiceOptions.getDefaultProjectId()); + + LOG.info( + String.format( + "%s tests running with parent project: %s", + ITBigQueryStorageTest.class.getSimpleName(), parentProjectId)); + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + bigquery = bigqueryHelper.getOptions().getService(); + DatasetInfo datasetInfo = + DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfo); + LOG.info("Created test dataset: " + DATASET); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + + if (bigquery != null) { + RemoteBigQueryHelper.forceDelete(bigquery, DATASET); + LOG.info("Deleted test dataset: " + DATASET); + } + } + + @Test + public void testSimpleRead() { + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ "bigquery-public-data", + /* datasetId = */ "samples", + /* tableId = */ "shakespeare"); + + ReadSession session = + client.createReadSession( + /* parent = */ parentProjectId, + /* readSession = */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.AVRO) + .build(), + /* maxStreamCount = */ 1); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); + + long rowCount = 0; + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + rowCount += response.getRowCount(); + } + + assertEquals(164_656, rowCount); + } + + @Test + public void testSimpleReadAndResume() { + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ "bigquery-public-data", + /* datasetId = */ "samples", + /* tableId = */ "shakespeare"); + + ReadSession session = + client.createReadSession( + /* parent = */ parentProjectId, + /* readSession = */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.AVRO) + .build(), + /* maxStreamCount = */ 1); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + // We have to read some number of rows in order to be able to resume. More details: + + long rowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder() + .setReadStream(session.getStreams(0).getName()) + .setOffset(rowCount) + .build(); + + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + + for (ReadRowsResponse response : stream) { + rowCount += response.getRowCount(); + } + + // Verifies that the number of rows skipped and read equals to the total number of rows in the + // table. + assertEquals(164_656, rowCount); + } + + @Test + public void testFilter() throws IOException { + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ "bigquery-public-data", + /* datasetId = */ "samples", + /* tableId = */ "shakespeare"); + + TableReadOptions options = + TableReadOptions.newBuilder().setRowRestriction("word_count > 100").build(); + + CreateReadSessionRequest request = + CreateReadSessionRequest.newBuilder() + .setParent(parentProjectId) + .setMaxStreamCount(1) + .setReadSession( + ReadSession.newBuilder() + .setTable(table) + .setReadOptions(options) + .setDataFormat(DataFormat.AVRO) + .build()) + .build(); + + ReadSession session = client.createReadSession(request); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); + + SimpleRowReader reader = + new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); + + long rowCount = 0; + + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + rowCount += response.getRowCount(); + reader.processRows( + response.getAvroRows(), + new AvroRowConsumer() { + @Override + public void accept(GenericData.Record record) { + Long wordCount = (Long) record.get("word_count"); + assertWithMessage("Row not matching expectations: %s", record.toString()) + .that(wordCount) + .isGreaterThan(100L); + } + }); + } + + assertEquals(1_333, rowCount); + } + + @Test + public void testColumnSelection() throws IOException { + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ "bigquery-public-data", + /* datasetId = */ "samples", + /* tableId = */ "shakespeare"); + + TableReadOptions options = + TableReadOptions.newBuilder() + .addSelectedFields("word") + .addSelectedFields("word_count") + .setRowRestriction("word_count > 100") + .build(); + + CreateReadSessionRequest request = + CreateReadSessionRequest.newBuilder() + .setParent(parentProjectId) + .setMaxStreamCount(1) + .setReadSession( + ReadSession.newBuilder() + .setTable(table) + .setReadOptions(options) + .setDataFormat(DataFormat.AVRO) + .build()) + .build(); + + ReadSession session = client.createReadSession(request); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); + + Schema avroSchema = new Schema.Parser().parse(session.getAvroSchema().getSchema()); + + String actualSchemaMessage = + String.format( + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); + + assertEquals(actualSchemaMessage, 2, avroSchema.getFields().size()); + assertEquals( + actualSchemaMessage, Schema.Type.STRING, avroSchema.getField("word").schema().getType()); + assertEquals( + actualSchemaMessage, + Schema.Type.LONG, + avroSchema.getField("word_count").schema().getType()); + + SimpleRowReader reader = new SimpleRowReader(avroSchema); + + long rowCount = 0; + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + rowCount += response.getRowCount(); + reader.processRows( + response.getAvroRows(), + new AvroRowConsumer() { + @Override + public void accept(GenericData.Record record) { + String rowAssertMessage = + String.format("Row not matching expectations: %s", record.toString()); + + Long wordCount = (Long) record.get("word_count"); + assertWithMessage(rowAssertMessage).that(wordCount).isGreaterThan(100L); + + Utf8 word = (Utf8) record.get("word"); + assertWithMessage(rowAssertMessage).that(word.length()).isGreaterThan(0); + } + }); + } + + assertEquals(1_333, rowCount); + } + + @Test + public void testReadAtSnapshot() throws InterruptedException, IOException { + Field intFieldSchema = + Field.newBuilder("col", LegacySQLTypeName.INTEGER) + .setMode(Mode.REQUIRED) + .setDescription("IntegerDescription") + .build(); + com.google.cloud.bigquery.Schema tableSchema = + com.google.cloud.bigquery.Schema.of(intFieldSchema); + + TableId testTableId = TableId.of(/* dataset = */ DATASET, /* table = */ "test_read_snapshot"); + bigquery.create(TableInfo.of(testTableId, StandardTableDefinition.of(tableSchema))); + + testTableId.toString(); + + Job firstJob = + RunQueryAppendJobAndExpectSuccess( + /* destinationTableId = */ testTableId, /* query = */ "SELECT 1 AS col"); + + Job secondJob = + RunQueryAppendJobAndExpectSuccess( + /* destinationTableId = */ testTableId, /* query = */ "SELECT 2 AS col"); + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ DATASET, + /* tableId = */ testTableId.getTable()); + + final List rowsAfterFirstSnapshot = new ArrayList<>(); + ProcessRowsAtSnapshot( + /* table = */ table, + /* snapshotInMillis = */ firstJob.getStatistics().getEndTime(), + /* filter = */ null, + /* consumer = */ new AvroRowConsumer() { + @Override + public void accept(GenericData.Record record) { + rowsAfterFirstSnapshot.add((Long) record.get("col")); + } + }); + assertEquals(Arrays.asList(1L), rowsAfterFirstSnapshot); + + final List rowsAfterSecondSnapshot = new ArrayList<>(); + ProcessRowsAtSnapshot( + /* table = */ table, + /* snapshotInMillis = */ secondJob.getStatistics().getEndTime(), + /* filter = */ null, + /* consumer = */ new AvroRowConsumer() { + @Override + public void accept(GenericData.Record record) { + rowsAfterSecondSnapshot.add((Long) record.get("col")); + } + }); + Collections.sort(rowsAfterSecondSnapshot); + assertEquals(Arrays.asList(1L, 2L), rowsAfterSecondSnapshot); + } + + @Test + public void testColumnPartitionedTableByDateField() throws InterruptedException, IOException { + String partitionedTableName = "test_column_partition_table_by_date"; + String createTableStatement = + String.format( + " CREATE TABLE %s.%s (num_field INT64, date_field DATE) " + + " PARTITION BY date_field " + + " OPTIONS( " + + " description=\"a table partitioned by date_field\" " + + " ) " + + "AS " + + " SELECT 1, CAST(\"2019-01-01\" AS DATE)" + + " UNION ALL" + + " SELECT 2, CAST(\"2019-01-02\" AS DATE)" + + " UNION ALL" + + " SELECT 3, CAST(\"2019-01-03\" AS DATE)", + DATASET, partitionedTableName); + + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ DATASET, + /* tableId = */ partitionedTableName); + + List unfilteredRows = ReadAllRows(/* table = */ table, /* filter = */ null); + assertEquals("Actual rows read: " + unfilteredRows.toString(), 3, unfilteredRows.size()); + + List partitionFilteredRows = + ReadAllRows( + /* table = */ table, /* filter = */ "date_field = CAST(\"2019-01-02\" AS DATE)"); + assertEquals( + "Actual rows read: " + partitionFilteredRows.toString(), 1, partitionFilteredRows.size()); + assertEquals(2L, partitionFilteredRows.get(0).get("num_field")); + } + + @Test + public void testIngestionTimePartitionedTable() throws InterruptedException, IOException { + Field intFieldSchema = + Field.newBuilder("num_field", LegacySQLTypeName.INTEGER) + .setMode(Mode.REQUIRED) + .setDescription("IntegerDescription") + .build(); + com.google.cloud.bigquery.Schema tableSchema = + com.google.cloud.bigquery.Schema.of(intFieldSchema); + + TableId testTableId = + TableId.of(/* dataset = */ DATASET, /* table = */ "test_date_partitioned_table"); + bigquery.create( + TableInfo.of( + testTableId, + StandardTableDefinition.newBuilder() + .setTimePartitioning(TimePartitioning.of(TimePartitioning.Type.DAY)) + .setSchema(tableSchema) + .build())); + + // Simulate ingestion for 2019-01-01. + RunQueryAppendJobAndExpectSuccess( + /* destinationTableId = */ TableId.of( + /* dataset = */ DATASET, /* table = */ testTableId.getTable() + "$20190101"), + /* query = */ "SELECT 1 AS num_field"); + + // Simulate ingestion for 2019-01-02. + RunQueryAppendJobAndExpectSuccess( + /* destinationTableId = */ TableId.of( + /* dataset = */ DATASET, /* table = */ testTableId.getTable() + "$20190102"), + /* query = */ "SELECT 2 AS num_field"); + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ testTableId.getDataset(), + /* tableId = */ testTableId.getTable()); + + List unfilteredRows = ReadAllRows(/* table = */ table, /* filter = */ null); + assertEquals("Actual rows read: " + unfilteredRows.toString(), 2, unfilteredRows.size()); + + List partitionFilteredRows = + ReadAllRows(/* table = */ table, /* filter = */ "_PARTITIONDATE > \"2019-01-01\""); + assertEquals( + "Actual rows read: " + partitionFilteredRows.toString(), 1, partitionFilteredRows.size()); + assertEquals(2L, partitionFilteredRows.get(0).get("num_field")); + } + + @Test + public void testBasicSqlTypes() throws InterruptedException, IOException { + String tableName = "test_basic_sql_types"; + String createTableStatement = + String.format( + " CREATE TABLE %s.%s " + + " (int_field INT64 NOT NULL," + + " num_field NUMERIC NOT NULL," + + " float_field FLOAT64 NOT NULL," + + " bool_field BOOL NOT NULL," + + " str_field STRING NOT NULL," + + " bytes_field BYTES NOT NULL) " + + " OPTIONS( " + + " description=\"a table with basic column types\" " + + " ) " + + "AS " + + " SELECT " + + " 17," + + " CAST(1234.56 AS NUMERIC)," + + " 6.547678," + + " TRUE," + + " \"String field value\"," + + " b\"абвгд\"", + DATASET, tableName); + + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ DATASET, + /* tableId = */ tableName); + + List rows = ReadAllRows(/* table = */ table, /* filter = */ null); + assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); + + GenericData.Record record = rows.get(0); + Schema avroSchema = record.getSchema(); + + String actualSchemaMessage = + String.format( + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); + String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); + + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); + assertEquals(actualSchemaMessage, 6, avroSchema.getFields().size()); + + assertEquals( + actualSchemaMessage, Schema.Type.LONG, avroSchema.getField("int_field").schema().getType()); + assertEquals(rowAssertMessage, 17L, (long) record.get("int_field")); + + assertEquals( + actualSchemaMessage, + Schema.Type.BYTES, + avroSchema.getField("num_field").schema().getType()); + assertEquals( + actualSchemaMessage, + LogicalTypes.decimal(/* precision = */ 38, /* scale = */ 9), + avroSchema.getField("num_field").schema().getLogicalType()); + BigDecimal actual_num_field = + new Conversions.DecimalConversion() + .fromBytes( + (ByteBuffer) record.get("num_field"), + avroSchema, + avroSchema.getField("num_field").schema().getLogicalType()); + assertEquals( + rowAssertMessage, + BigDecimal.valueOf(/* unscaledVal = */ 1_234_560_000_000L, /* scale = */ 9), + actual_num_field); + + assertEquals( + actualSchemaMessage, + Schema.Type.DOUBLE, + avroSchema.getField("float_field").schema().getType()); + assertEquals( + rowAssertMessage, + /* expected = */ 6.547678d, + /* actual = */ (double) record.get("float_field"), + /* delta = */ 0.0001); + + assertEquals( + actualSchemaMessage, + Schema.Type.BOOLEAN, + avroSchema.getField("bool_field").schema().getType()); + assertEquals(rowAssertMessage, true, record.get("bool_field")); + + assertEquals( + actualSchemaMessage, + Schema.Type.STRING, + avroSchema.getField("str_field").schema().getType()); + assertEquals(rowAssertMessage, new Utf8("String field value"), record.get("str_field")); + + assertEquals( + actualSchemaMessage, + Schema.Type.BYTES, + avroSchema.getField("bytes_field").schema().getType()); + assertArrayEquals( + rowAssertMessage, + Utf8.getBytesFor("абвгд"), + ((ByteBuffer) (record.get("bytes_field"))).array()); + } + + @Test + public void testDateAndTimeSqlTypes() throws InterruptedException, IOException { + String tableName = "test_date_and_time_sql_types"; + String createTableStatement = + String.format( + " CREATE TABLE %s.%s " + + " (date_field DATE NOT NULL," + + " datetime_field DATETIME NOT NULL," + + " time_field TIME NOT NULL," + + " timestamp_field TIMESTAMP NOT NULL)" + + " OPTIONS( " + + " description=\"a table with date and time column types\" " + + " ) " + + "AS " + + " SELECT " + + " CAST(\"2019-05-31\" AS DATE)," + + " CAST(\"2019-04-30 21:47:59.999999\" AS DATETIME)," + + " CAST(\"21:47:59.999999\" AS TIME)," + + " CAST(\"2019-04-30 19:24:19.123456 UTC\" AS TIMESTAMP)", + DATASET, tableName); + + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ DATASET, + /* tableId = */ tableName); + + List rows = ReadAllRows(/* table = */ table, /* filter = */ null); + assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); + + GenericData.Record record = rows.get(0); + Schema avroSchema = record.getSchema(); + + String actualSchemaMessage = + String.format( + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); + String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); + + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); + assertEquals(actualSchemaMessage, 4, avroSchema.getFields().size()); + + assertEquals( + actualSchemaMessage, Schema.Type.INT, avroSchema.getField("date_field").schema().getType()); + assertEquals( + actualSchemaMessage, + LogicalTypes.date(), + avroSchema.getField("date_field").schema().getLogicalType()); + assertEquals( + rowAssertMessage, + LocalDate.of(/* year = */ 2019, /* month = */ 5, /* dayOfMonth = */ 31), + LocalDate.ofEpochDay((int) record.get("date_field"))); + + assertEquals( + actualSchemaMessage, + Schema.Type.STRING, + avroSchema.getField("datetime_field").schema().getType()); + assertEquals( + actualSchemaMessage, + "datetime", + avroSchema.getField("datetime_field").schema().getObjectProp("logicalType")); + assertEquals( + rowAssertMessage, + new Utf8("2019-04-30T21:47:59.999999"), + (Utf8) record.get("datetime_field")); + + assertEquals( + actualSchemaMessage, + Schema.Type.LONG, + avroSchema.getField("time_field").schema().getType()); + assertEquals( + actualSchemaMessage, + LogicalTypes.timeMicros(), + avroSchema.getField("time_field").schema().getLogicalType()); + assertEquals( + rowAssertMessage, + LocalTime.of( + /* hour = */ 21, + /* minute = */ 47, + /* second = */ 59, + /* nanoOfSecond = */ 999_999_000), + LocalTime.ofNanoOfDay(1_000L * (long) record.get("time_field"))); + + assertEquals( + actualSchemaMessage, + Schema.Type.LONG, + avroSchema.getField("timestamp_field").schema().getType()); + assertEquals( + actualSchemaMessage, + LogicalTypes.timestampMicros(), + avroSchema.getField("timestamp_field").schema().getLogicalType()); + ZonedDateTime expected_timestamp = + ZonedDateTime.parse( + "2019-04-30T19:24:19Z", DateTimeFormatter.ISO_INSTANT.withZone(ZoneOffset.UTC)) + .withNano(123_456_000); + long actual_timestamp_micros = (long) record.get("timestamp_field"); + ZonedDateTime actual_timestamp = + ZonedDateTime.ofInstant( + Instant.ofEpochSecond( + /* epochSecond = */ actual_timestamp_micros / 1_000_000, + (actual_timestamp_micros % 1_000_000) * 1_000), + ZoneOffset.UTC); + assertEquals(rowAssertMessage, expected_timestamp, actual_timestamp); + } + + @Test + public void testGeographySqlType() throws InterruptedException, IOException { + String tableName = "test_geography_sql_type"; + String createTableStatement = + String.format( + " CREATE TABLE %s.%s " + + " (geo_field GEOGRAPHY NOT NULL)" + + " OPTIONS( " + + " description=\"a table with a geography column type\" " + + " ) " + + "AS " + + " SELECT ST_GEOGPOINT(1.1, 2.2)", + DATASET, tableName); + + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ DATASET, + /* tableId = */ tableName); + + List rows = ReadAllRows(/* table = */ table, /* filter = */ null); + assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); + + GenericData.Record record = rows.get(0); + Schema avroSchema = record.getSchema(); + + String actualSchemaMessage = + String.format( + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); + String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); + + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); + assertEquals(actualSchemaMessage, 1, avroSchema.getFields().size()); + + assertEquals( + actualSchemaMessage, + Schema.Type.STRING, + avroSchema.getField("geo_field").schema().getType()); + assertEquals( + actualSchemaMessage, + "GEOGRAPHY", + avroSchema.getField("geo_field").schema().getObjectProp("sqlType")); + assertEquals(rowAssertMessage, new Utf8("POINT(1.1 2.2)"), (Utf8) record.get("geo_field")); + } + + @Test + public void testStructAndArraySqlTypes() throws InterruptedException, IOException { + String tableName = "test_struct_and_array_sql_types"; + String createTableStatement = + String.format( + " CREATE TABLE %s.%s " + + " (array_field ARRAY," + + " struct_field STRUCT NOT NULL)" + + " OPTIONS( " + + " description=\"a table with array and time column types\" " + + " ) " + + "AS " + + " SELECT " + + " [1, 2, 3]," + + " (10, 'abc')", + DATASET, tableName); + + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ DATASET, + /* tableId = */ tableName); + + List rows = ReadAllRows(/* table = */ table, /* filter = */ null); + assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); + + GenericData.Record record = rows.get(0); + Schema avroSchema = record.getSchema(); + + String actualSchemaMessage = + String.format( + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); + String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); + + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); + assertEquals(actualSchemaMessage, 2, avroSchema.getFields().size()); + + assertEquals( + actualSchemaMessage, + Schema.Type.ARRAY, + avroSchema.getField("array_field").schema().getType()); + assertEquals( + actualSchemaMessage, + Schema.Type.LONG, + avroSchema.getField("array_field").schema().getElementType().getType()); + assertArrayEquals( + rowAssertMessage, + new Long[] {1L, 2L, 3L}, + ((GenericData.Array) record.get("array_field")).toArray(new Long[0])); + + // Validate the STRUCT field and its members. + Schema structSchema = avroSchema.getField("struct_field").schema(); + assertEquals(actualSchemaMessage, Schema.Type.RECORD, structSchema.getType()); + GenericData.Record structRecord = (GenericData.Record) record.get("struct_field"); + + assertEquals( + actualSchemaMessage, + Schema.Type.LONG, + structSchema.getField("int_field").schema().getType()); + assertEquals(rowAssertMessage, 10L, (long) structRecord.get("int_field")); + + assertEquals( + actualSchemaMessage, + Schema.Type.STRING, + structSchema.getField("str_field").schema().getType()); + assertEquals(rowAssertMessage, new Utf8("abc"), structRecord.get("str_field")); + } + + /** + * Reads to the specified row offset within the stream. If the stream does not have the desired + * rows to read, it will read all of them. + * + * @param readStream + * @param rowOffset + * @return the number of requested rows to skip or the total rows read if stream had less rows. + */ + private long ReadStreamToOffset(ReadStream readStream, long rowOffset) { + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build(); + + long rowCount = 0; + ServerStream serverStream = client.readRowsCallable().call(readRowsRequest); + Iterator responseIterator = serverStream.iterator(); + + while (responseIterator.hasNext()) { + ReadRowsResponse response = responseIterator.next(); + rowCount += response.getRowCount(); + if (rowCount >= rowOffset) { + return rowOffset; + } + } + + return rowCount; + } + + /** + * Reads all the rows from the specified table. + * + *

For every row, the consumer is called for processing. + * + * @param table + * @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned. + * @param filter Optional. If specified, it will be used to restrict returned data. + * @param consumer that receives all Avro rows. + * @throws IOException + */ + private void ProcessRowsAtSnapshot( + String table, Long snapshotInMillis, String filter, AvroRowConsumer consumer) + throws IOException { + Preconditions.checkNotNull(table); + Preconditions.checkNotNull(consumer); + + CreateReadSessionRequest.Builder createSessionRequestBuilder = + CreateReadSessionRequest.newBuilder() + .setParent(parentProjectId) + .setMaxStreamCount(1) + .setReadSession( + ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.AVRO).build()); + + if (snapshotInMillis != null) { + Timestamp snapshotTimestamp = + Timestamp.newBuilder() + .setSeconds(snapshotInMillis / 1_000) + .setNanos((int) ((snapshotInMillis % 1000) * 1000000)) + .build(); + createSessionRequestBuilder + .getReadSessionBuilder() + .setTableModifiers( + TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build()); + } + + if (filter != null && !filter.isEmpty()) { + createSessionRequestBuilder + .getReadSessionBuilder() + .setReadOptions(TableReadOptions.newBuilder().setRowRestriction(filter).build()); + } + + ReadSession session = client.createReadSession(createSessionRequestBuilder.build()); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); + + SimpleRowReader reader = + new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); + + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + reader.processRows(response.getAvroRows(), consumer); + } + } + + /** + * Reads all the rows from the specified table and returns a list as generic Avro records. + * + * @param table + * @param filter Optional. If specified, it will be used to restrict returned data. + * @return + */ + List ReadAllRows(String table, String filter) throws IOException { + final List rows = new ArrayList<>(); + ProcessRowsAtSnapshot( + /* table = */ table, + /* snapshotInMillis = */ null, + /* filter = */ filter, + new AvroRowConsumer() { + @Override + public void accept(GenericData.Record record) { + // clone the record since that reference will be reused by the reader. + rows.add(new GenericRecordBuilder(record).build()); + } + }); + return rows; + } + + /** + * Runs a query job with WRITE_APPEND disposition to the destination table and returns the + * successfully completed job. + * + * @param destinationTableId + * @param query + * @return + * @throws InterruptedException + */ + private Job RunQueryAppendJobAndExpectSuccess(TableId destinationTableId, String query) + throws InterruptedException { + return RunQueryJobAndExpectSuccess( + QueryJobConfiguration.newBuilder(query) + .setDestinationTable(destinationTableId) + .setUseQueryCache(false) + .setUseLegacySql(false) + .setWriteDisposition(WriteDisposition.WRITE_APPEND) + .build()); + } + + /** + * Runs a query job with provided configuration and returns the successfully completed job. + * + * @param configuration + * @return + * @throws InterruptedException + */ + private Job RunQueryJobAndExpectSuccess(QueryJobConfiguration configuration) + throws InterruptedException { + Job job = bigquery.create(JobInfo.of(configuration)); + Job completedJob = + job.waitFor( + RetryOption.initialRetryDelay(Duration.ofSeconds(1)), + RetryOption.totalTimeout(Duration.ofMinutes(1))); + + assertNotNull(completedJob); + assertNull( + /* message = */ "Received a job status that is not a success: " + + completedJob.getStatus().toString(), + /* object = */ completedJob.getStatus().getError()); + + return completedJob; + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReader.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReader.java new file mode 100644 index 0000000000..8b72461b15 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReader.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import com.google.cloud.bigquery.storage.v1.AvroRows; +import com.google.common.base.Preconditions; +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; + +/* + * SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted + * from the storage API using a generic datum decoder. + */ +public class SimpleRowReader { + + public interface AvroRowConsumer { + + /** + * Handler for every new Avro row that is read. + * + * @param record is Avro generic record structure. Consumers should not rely on the reference + * and should copy it if needed. The record reference is reused. + */ + void accept(GenericData.Record record); + } + + private final DatumReader datumReader; + + // Decoder object will be reused to avoid re-allocation and too much garbage collection. + private BinaryDecoder decoder = null; + + // Record object will be reused. + private GenericData.Record row = null; + + public SimpleRowReader(Schema schema) { + Preconditions.checkNotNull(schema); + datumReader = new GenericDatumReader<>(schema); + } + + /** + * Processes Avro rows by calling a consumer for each decoded row. + * + * @param avroRows object returned from the ReadRowsResponse. + * @param rowConsumer consumer that accepts GenericRecord. + */ + public void processRows(AvroRows avroRows, AvroRowConsumer rowConsumer) throws IOException { + Preconditions.checkNotNull(avroRows); + Preconditions.checkNotNull(rowConsumer); + decoder = + DecoderFactory.get() + .binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), decoder); + + while (!decoder.isEnd()) { + row = datumReader.read(row, decoder); + rowConsumer.accept(row); + } + } +} From 2bbbb655101ed16d59b68a31fffd5c85859e21ef Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Fri, 7 Feb 2020 15:49:46 -0500 Subject: [PATCH 4/4] chore: release 0.124.0 (#69) * updated CHANGELOG.md [ci skip] * updated README.md [ci skip] * updated versions.txt [ci skip] * updated google-cloud-bigquerystorage-bom/pom.xml [ci skip] * updated google-cloud-bigquerystorage/pom.xml [ci skip] * updated grpc-google-cloud-bigquerystorage-v1/pom.xml [ci skip] * updated grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml [ci skip] * updated grpc-google-cloud-bigquerystorage-v1beta1/pom.xml [ci skip] * updated grpc-google-cloud-bigquerystorage-v1beta2/pom.xml [ci skip] * updated pom.xml [ci skip] * updated proto-google-cloud-bigquerystorage-v1/pom.xml [ci skip] * updated proto-google-cloud-bigquerystorage-v1alpha2/pom.xml [ci skip] * updated proto-google-cloud-bigquerystorage-v1beta1/pom.xml [ci skip] * updated proto-google-cloud-bigquerystorage-v1beta2/pom.xml [ci skip] --- CHANGELOG.md | 8 ++++++++ README.md | 6 +++--- google-cloud-bigquerystorage-bom/pom.xml | 20 +++++++++---------- google-cloud-bigquerystorage/pom.xml | 4 ++-- grpc-google-cloud-bigquerystorage-v1/pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- pom.xml | 20 +++++++++---------- proto-google-cloud-bigquerystorage-v1/pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- .../pom.xml | 4 ++-- versions.txt | 18 ++++++++--------- 14 files changed, 58 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 012fce2628..76382e4bc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## [0.124.0](https://www.github.com/googleapis/java-bigquerystorage/compare/v0.123.0...v0.124.0) (2020-02-07) + + +### Features + +* add an enhanced layer for BigQuery Storage v1 client ([#66](https://www.github.com/googleapis/java-bigquerystorage/issues/66)) ([43fc284](https://www.github.com/googleapis/java-bigquerystorage/commit/43fc284e00ddbc9a018d734e3f6f09c82ebd92d4)) +* add v1 integration tests ([#68](https://www.github.com/googleapis/java-bigquerystorage/issues/68)) ([8e7ac15](https://www.github.com/googleapis/java-bigquerystorage/commit/8e7ac1511b9f9eaea417e6761848e4735039a831)) + ## [0.123.0](https://www.github.com/googleapis/java-bigquerystorage/compare/v0.122.0...v0.123.0) (2020-02-06) diff --git a/README.md b/README.md index 680de57484..b1c460f4b9 100644 --- a/README.md +++ b/README.md @@ -38,16 +38,16 @@ If you are using Maven without a BOM, add this to your dependencies. com.google.cloud google-cloud-bigquerystorage - 0.123.0-beta + 0.124.0-beta ``` If you are using Gradle, add this to your dependencies ```Groovy -compile 'com.google.cloud:google-cloud-bigquerystorage:0.123.0-beta' +compile 'com.google.cloud:google-cloud-bigquerystorage:0.124.0-beta' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "0.123.0-beta" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "0.124.0-beta" ``` [//]: # ({x-version-update-end}) diff --git a/google-cloud-bigquerystorage-bom/pom.xml b/google-cloud-bigquerystorage-bom/pom.xml index 00135e684a..bb0dcc683b 100644 --- a/google-cloud-bigquerystorage-bom/pom.xml +++ b/google-cloud-bigquerystorage-bom/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-bigquerystorage-bom - 0.123.1-beta-SNAPSHOT + 0.124.0-beta pom com.google.cloud @@ -63,48 +63,48 @@ com.google.api.grpc proto-google-cloud-bigquerystorage-v1alpha2 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1alpha2 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 0.88.1-SNAPSHOT + 0.89.0 com.google.cloud google-cloud-bigquerystorage - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 6da3d004f5..99de990588 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-bigquerystorage - 0.123.1-beta-SNAPSHOT + 0.124.0-beta jar BigQuery Storage https://github.com/googleapis/java-bigquerystorage @@ -11,7 +11,7 @@ com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta google-cloud-bigquerystorage diff --git a/grpc-google-cloud-bigquerystorage-v1/pom.xml b/grpc-google-cloud-bigquerystorage-v1/pom.xml index 515f776626..b504945a28 100644 --- a/grpc-google-cloud-bigquerystorage-v1/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 0.88.1-SNAPSHOT + 0.89.0 grpc-google-cloud-bigquerystorage-v1 GRPC library for grpc-google-cloud-bigquerystorage-v1 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml b/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml index af828b8a88..2d92762632 100644 --- a/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1alpha2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1alpha2 - 0.88.1-SNAPSHOT + 0.89.0 grpc-google-cloud-bigquerystorage-v1alpha2 GRPC library for grpc-google-cloud-bigquerystorage-v1alpha2 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml index a87997ed7a..b4105bf829 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.88.1-SNAPSHOT + 0.89.0 grpc-google-cloud-bigquerystorage-v1beta1 GRPC library for grpc-google-cloud-bigquerystorage-v1beta1 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml index b8e553ed9d..ed1af068c7 100644 --- a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml +++ b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.88.1-SNAPSHOT + 0.89.0 grpc-google-cloud-bigquerystorage-v1beta2 GRPC library for grpc-google-cloud-bigquerystorage-v1beta2 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/pom.xml b/pom.xml index 0207db9bf1..b874e33083 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-bigquerystorage-parent pom - 0.123.1-beta-SNAPSHOT + 0.124.0-beta BigQuery Storage Parent https://github.com/googleapis/java-bigquerystorage @@ -83,43 +83,43 @@ com.google.api.grpc proto-google-cloud-bigquerystorage-v1alpha2 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1alpha2 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta1 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - 0.88.1-SNAPSHOT + 0.89.0 com.google.api.grpc grpc-google-cloud-bigquerystorage-v1 - 0.88.1-SNAPSHOT + 0.89.0 com.google.auto.value @@ -134,7 +134,7 @@ com.google.cloud google-cloud-bigquerystorage - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/proto-google-cloud-bigquerystorage-v1/pom.xml b/proto-google-cloud-bigquerystorage-v1/pom.xml index 4c7c9658ee..220bf1cfac 100644 --- a/proto-google-cloud-bigquerystorage-v1/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1 - 0.88.1-SNAPSHOT + 0.89.0 proto-google-cloud-bigquerystorage-v1 PROTO library for proto-google-cloud-bigquerystorage-v1 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/proto-google-cloud-bigquerystorage-v1alpha2/pom.xml b/proto-google-cloud-bigquerystorage-v1alpha2/pom.xml index 520bbc9a93..0e8c8cf56d 100644 --- a/proto-google-cloud-bigquerystorage-v1alpha2/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1alpha2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1alpha2 - 0.88.1-SNAPSHOT + 0.89.0 proto-google-cloud-bigquerystorage-v1alpha2 PROTO library for proto-google-cloud-bigquerystorage-v1alpha2 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml index 6d2bf288f3..dae15b411f 100644 --- a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta1 - 0.88.1-SNAPSHOT + 0.89.0 proto-google-cloud-bigquerystorage-v1beta1 PROTO library for proto-google-cloud-bigquerystorage-v1beta1 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml index 68381cc61d..ea51437dbc 100644 --- a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml +++ b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.google.api.grpc proto-google-cloud-bigquerystorage-v1beta2 - 0.88.1-SNAPSHOT + 0.89.0 proto-google-cloud-bigquerystorage-v1beta2 PROTO library for proto-google-cloud-bigquerystorage-v1beta2 com.google.cloud google-cloud-bigquerystorage-parent - 0.123.1-beta-SNAPSHOT + 0.124.0-beta diff --git a/versions.txt b/versions.txt index 9bd37c5f8b..930dbbf7c1 100644 --- a/versions.txt +++ b/versions.txt @@ -1,12 +1,12 @@ # Format: # module:released-version:current-version -proto-google-cloud-bigquerystorage-v1alpha2:0.88.0:0.88.1-SNAPSHOT -proto-google-cloud-bigquerystorage-v1beta1:0.88.0:0.88.1-SNAPSHOT -proto-google-cloud-bigquerystorage-v1beta2:0.88.0:0.88.1-SNAPSHOT -proto-google-cloud-bigquerystorage-v1:0.88.0:0.88.1-SNAPSHOT -grpc-google-cloud-bigquerystorage-v1alpha2:0.88.0:0.88.1-SNAPSHOT -grpc-google-cloud-bigquerystorage-v1beta1:0.88.0:0.88.1-SNAPSHOT -grpc-google-cloud-bigquerystorage-v1beta2:0.88.0:0.88.1-SNAPSHOT -grpc-google-cloud-bigquerystorage-v1:0.88.0:0.88.1-SNAPSHOT -google-cloud-bigquerystorage:0.123.0-beta:0.123.1-beta-SNAPSHOT +proto-google-cloud-bigquerystorage-v1alpha2:0.89.0:0.89.0 +proto-google-cloud-bigquerystorage-v1beta1:0.89.0:0.89.0 +proto-google-cloud-bigquerystorage-v1beta2:0.89.0:0.89.0 +proto-google-cloud-bigquerystorage-v1:0.89.0:0.89.0 +grpc-google-cloud-bigquerystorage-v1alpha2:0.89.0:0.89.0 +grpc-google-cloud-bigquerystorage-v1beta1:0.89.0:0.89.0 +grpc-google-cloud-bigquerystorage-v1beta2:0.89.0:0.89.0 +grpc-google-cloud-bigquerystorage-v1:0.89.0:0.89.0 +google-cloud-bigquerystorage:0.124.0-beta:0.124.0-beta