Lokasi ngalangkungan proxy:   [ UP ]  
[Ngawartoskeun bug]   [Panyetelan cookie]                
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 691f078

Browse files
fix: JsonWriter accepts string input for DATETIME, TIME, NUMERIC, BIGNUMERIC field (#1339)
* fix: update code comment to reflect max size change * fix: JsonWriter support string DateTime, Time, Numeric, BigNumeric * . * . * fix format * remove a test that is covered by JsonToProtoMessageTest * . * . * . * remove v1 test that is failing due to test proto update, test coverage will be added when the additional type support is ported to v1 * . * . * . * 🦉 Updates from OwlBot See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent db2ca42 commit 691f078

8 files changed

Lines changed: 624 additions & 454 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private JsonStreamWriter(Builder builder)
7575
builder.traceId);
7676
this.streamWriter = streamWriterBuilder.build();
7777
this.streamName = builder.streamName;
78+
this.tableSchema = builder.tableSchema;
7879
}
7980

8081
/**
@@ -105,7 +106,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
105106
// of JSON data.
106107
for (int i = 0; i < jsonArr.length(); i++) {
107108
JSONObject json = jsonArr.getJSONObject(i);
108-
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
109+
Message protoMessage =
110+
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
109111
rowsBuilder.addSerializedRows(protoMessage.toByteString());
110112
}
111113
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessage.java

Lines changed: 164 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.bigquery.storage.v1beta2;
1717

18+
import com.google.api.pathtemplate.ValidationException;
1819
import com.google.common.base.Preconditions;
1920
import com.google.common.collect.ImmutableMap;
2021
import com.google.protobuf.ByteString;
@@ -23,10 +24,14 @@
2324
import com.google.protobuf.DynamicMessage;
2425
import com.google.protobuf.Message;
2526
import com.google.protobuf.UninitializedMessageException;
27+
import java.math.BigDecimal;
28+
import java.util.List;
2629
import java.util.logging.Logger;
2730
import org.json.JSONArray;
2831
import org.json.JSONException;
2932
import org.json.JSONObject;
33+
import org.threeten.bp.LocalDateTime;
34+
import org.threeten.bp.LocalTime;
3035

3136
/**
3237
* Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf
@@ -58,7 +63,28 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
5863
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
5964
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");
6065

61-
return convertJsonToProtoMessageImpl(protoSchema, json, "root", /*topLevel=*/ true);
66+
return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
67+
}
68+
69+
/**
70+
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
71+
*
72+
* @param protoSchema
73+
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
74+
* NUMERIC, BIGNUMERIC
75+
* @param json
76+
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
77+
*/
78+
public static DynamicMessage convertJsonToProtoMessage(
79+
Descriptor protoSchema, TableSchema tableSchema, JSONObject json)
80+
throws IllegalArgumentException {
81+
Preconditions.checkNotNull(json, "JSONObject is null.");
82+
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
83+
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
84+
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");
85+
86+
return convertJsonToProtoMessageImpl(
87+
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
6288
}
6389

6490
/**
@@ -71,9 +97,12 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
7197
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
7298
*/
7399
private static DynamicMessage convertJsonToProtoMessageImpl(
74-
Descriptor protoSchema, JSONObject json, String jsonScope, boolean topLevel)
100+
Descriptor protoSchema,
101+
List<TableFieldSchema> tableSchema,
102+
JSONObject json,
103+
String jsonScope,
104+
boolean topLevel)
75105
throws IllegalArgumentException {
76-
77106
DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
78107
String[] jsonNames = JSONObject.getNames(json);
79108
if (jsonNames == null) {
@@ -90,10 +119,25 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
90119
throw new IllegalArgumentException(
91120
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
92121
}
122+
TableFieldSchema fieldSchema = null;
123+
if (tableSchema != null) {
124+
// protoSchema is generated from tableSchema so their field ordering should match.
125+
fieldSchema = tableSchema.get(field.getIndex());
126+
if (!fieldSchema.getName().equals(field.getName())) {
127+
throw new ValidationException(
128+
"Field at index "
129+
+ field.getIndex()
130+
+ " has mismatch names ("
131+
+ fieldSchema.getName()
132+
+ ") ("
133+
+ field.getName()
134+
+ ")");
135+
}
136+
}
93137
if (!field.isRepeated()) {
94-
fillField(protoMsg, field, json, jsonName, currentScope);
138+
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
95139
} else {
96-
fillRepeatedField(protoMsg, field, json, jsonName, currentScope);
140+
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
97141
}
98142
}
99143

@@ -127,6 +171,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
127171
private static void fillField(
128172
DynamicMessage.Builder protoMsg,
129173
FieldDescriptor fieldDescriptor,
174+
TableFieldSchema fieldSchema,
130175
JSONObject json,
131176
String exactJsonKeyName,
132177
String currentScope)
@@ -144,6 +189,25 @@ private static void fillField(
144189
}
145190
break;
146191
case BYTES:
192+
if (fieldSchema != null) {
193+
if (fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
194+
if (val instanceof String) {
195+
protoMsg.setField(
196+
fieldDescriptor,
197+
BigDecimalByteStringEncoder.encodeToNumericByteString(
198+
new BigDecimal((String) val)));
199+
return;
200+
}
201+
} else if (fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
202+
if (val instanceof String) {
203+
protoMsg.setField(
204+
fieldDescriptor,
205+
BigDecimalByteStringEncoder.encodeToNumericByteString(
206+
new BigDecimal((String) val)));
207+
return;
208+
}
209+
}
210+
}
147211
if (val instanceof ByteString) {
148212
protoMsg.setField(fieldDescriptor, ((ByteString) val).toByteArray());
149213
return;
@@ -170,6 +234,29 @@ private static void fillField(
170234
}
171235
break;
172236
case INT64:
237+
if (fieldSchema != null) {
238+
if (fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
239+
if (val instanceof String) {
240+
protoMsg.setField(
241+
fieldDescriptor,
242+
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
243+
return;
244+
} else if (val instanceof Long) {
245+
protoMsg.setField(fieldDescriptor, (Long) val);
246+
return;
247+
}
248+
} else if (fieldSchema.getType() == TableFieldSchema.Type.TIME) {
249+
if (val instanceof String) {
250+
protoMsg.setField(
251+
fieldDescriptor,
252+
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
253+
return;
254+
} else if (val instanceof Long) {
255+
protoMsg.setField(fieldDescriptor, (Long) val);
256+
return;
257+
}
258+
}
259+
}
173260
if (val instanceof Integer) {
174261
protoMsg.setField(fieldDescriptor, new Long((Integer) val));
175262
return;
@@ -206,6 +293,7 @@ private static void fillField(
206293
fieldDescriptor,
207294
convertJsonToProtoMessageImpl(
208295
fieldDescriptor.getMessageType(),
296+
fieldSchema == null ? null : fieldSchema.getFieldsList(),
209297
json.getJSONObject(exactJsonKeyName),
210298
currentScope,
211299
/*topLevel =*/ false));
@@ -232,6 +320,7 @@ private static void fillField(
232320
private static void fillRepeatedField(
233321
DynamicMessage.Builder protoMsg,
234322
FieldDescriptor fieldDescriptor,
323+
TableFieldSchema fieldSchema,
235324
JSONObject json,
236325
String exactJsonKeyName,
237326
String currentScope)
@@ -259,40 +348,81 @@ private static void fillRepeatedField(
259348
}
260349
break;
261350
case BYTES:
262-
if (val instanceof JSONArray) {
263-
try {
264-
byte[] bytes = new byte[((JSONArray) val).length()];
265-
for (int j = 0; j < ((JSONArray) val).length(); j++) {
266-
bytes[j] = (byte) ((JSONArray) val).getInt(j);
267-
if (bytes[j] != ((JSONArray) val).getInt(j)) {
268-
throw new IllegalArgumentException(
269-
String.format(
270-
"Error: "
271-
+ currentScope
272-
+ "["
273-
+ index
274-
+ "] could not be converted to byte[]."));
351+
Boolean added = false;
352+
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
353+
if (val instanceof String) {
354+
protoMsg.addRepeatedField(
355+
fieldDescriptor,
356+
BigDecimalByteStringEncoder.encodeToNumericByteString(
357+
new BigDecimal((String) val)));
358+
added = true;
359+
}
360+
} else if (fieldSchema != null
361+
&& fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
362+
if (val instanceof String) {
363+
protoMsg.addRepeatedField(
364+
fieldDescriptor,
365+
BigDecimalByteStringEncoder.encodeToNumericByteString(
366+
new BigDecimal((String) val)));
367+
added = true;
368+
}
369+
}
370+
if (!added) {
371+
if (val instanceof JSONArray) {
372+
try {
373+
byte[] bytes = new byte[((JSONArray) val).length()];
374+
for (int j = 0; j < ((JSONArray) val).length(); j++) {
375+
bytes[j] = (byte) ((JSONArray) val).getInt(j);
376+
if (bytes[j] != ((JSONArray) val).getInt(j)) {
377+
throw new IllegalArgumentException(
378+
String.format(
379+
"Error: "
380+
+ currentScope
381+
+ "["
382+
+ index
383+
+ "] could not be converted to byte[]."));
384+
}
275385
}
386+
protoMsg.addRepeatedField(fieldDescriptor, bytes);
387+
} catch (JSONException e) {
388+
throw new IllegalArgumentException(
389+
String.format(
390+
"Error: "
391+
+ currentScope
392+
+ "["
393+
+ index
394+
+ "] could not be converted to byte[]."));
276395
}
277-
protoMsg.addRepeatedField(fieldDescriptor, bytes);
278-
} catch (JSONException e) {
279-
throw new IllegalArgumentException(
280-
String.format(
281-
"Error: "
282-
+ currentScope
283-
+ "["
284-
+ index
285-
+ "] could not be converted to byte[]."));
396+
} else if (val instanceof ByteString) {
397+
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
398+
return;
399+
} else {
400+
fail = true;
286401
}
287-
} else if (val instanceof ByteString) {
288-
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
289-
return;
290-
} else {
291-
fail = true;
292402
}
293403
break;
294404
case INT64:
295-
if (val instanceof Integer) {
405+
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
406+
if (val instanceof String) {
407+
protoMsg.addRepeatedField(
408+
fieldDescriptor,
409+
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
410+
} else if (val instanceof Long) {
411+
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
412+
} else {
413+
fail = true;
414+
}
415+
} else if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIME) {
416+
if (val instanceof String) {
417+
protoMsg.addRepeatedField(
418+
fieldDescriptor,
419+
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
420+
} else if (val instanceof Long) {
421+
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
422+
} else {
423+
fail = true;
424+
}
425+
} else if (val instanceof Integer) {
296426
protoMsg.addRepeatedField(fieldDescriptor, new Long((Integer) val));
297427
} else if (val instanceof Long) {
298428
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
@@ -330,6 +460,7 @@ private static void fillRepeatedField(
330460
fieldDescriptor,
331461
convertJsonToProtoMessageImpl(
332462
fieldDescriptor.getMessageType(),
463+
fieldSchema == null ? null : fieldSchema.getFieldsList(),
333464
jsonArray.getJSONObject(i),
334465
currentScope,
335466
/*topLevel =*/ false));

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,18 @@ public void testStructComplex() throws Exception {
164164
.setMode(TableFieldSchema.Mode.REQUIRED)
165165
.setName("test_date")
166166
.build();
167+
final TableFieldSchema test_datetime =
168+
TableFieldSchema.newBuilder()
169+
.setType(TableFieldSchema.Type.DATETIME)
170+
.setMode(TableFieldSchema.Mode.NULLABLE)
171+
.setName("test_datetime")
172+
.build();
173+
final TableFieldSchema test_datetime_str =
174+
TableFieldSchema.newBuilder()
175+
.setType(TableFieldSchema.Type.DATETIME)
176+
.setMode(TableFieldSchema.Mode.REPEATED)
177+
.setName("test_datetime_str")
178+
.build();
167179
final TableFieldSchema ComplexLvl2 =
168180
TableFieldSchema.newBuilder()
169181
.setType(TableFieldSchema.Type.STRUCT)
@@ -203,12 +215,36 @@ public void testStructComplex() throws Exception {
203215
.setMode(TableFieldSchema.Mode.NULLABLE)
204216
.setName("test_time")
205217
.build();
218+
final TableFieldSchema TEST_TIME_STR =
219+
TableFieldSchema.newBuilder()
220+
.setType(TableFieldSchema.Type.TIME)
221+
.setMode(TableFieldSchema.Mode.NULLABLE)
222+
.setName("test_time_str")
223+
.build();
206224
final TableFieldSchema TEST_NUMERIC_REPEATED =
207225
TableFieldSchema.newBuilder()
208226
.setType(TableFieldSchema.Type.NUMERIC)
209227
.setMode(TableFieldSchema.Mode.REPEATED)
210228
.setName("test_numeric_repeated")
211229
.build();
230+
final TableFieldSchema TEST_NUMERIC_STR =
231+
TableFieldSchema.newBuilder()
232+
.setType(TableFieldSchema.Type.NUMERIC)
233+
.setMode(TableFieldSchema.Mode.NULLABLE)
234+
.setName("test_numeric_str")
235+
.build();
236+
final TableFieldSchema TEST_BIGNUMERIC =
237+
TableFieldSchema.newBuilder()
238+
.setType(TableFieldSchema.Type.NUMERIC)
239+
.setMode(TableFieldSchema.Mode.NULLABLE)
240+
.setName("test_bignumeric")
241+
.build();
242+
final TableFieldSchema TEST_BIGNUMERIC_STR =
243+
TableFieldSchema.newBuilder()
244+
.setType(TableFieldSchema.Type.NUMERIC)
245+
.setMode(TableFieldSchema.Mode.REPEATED)
246+
.setName("test_bignumeric_str")
247+
.build();
212248
final TableSchema tableSchema =
213249
TableSchema.newBuilder()
214250
.addFields(0, test_int)
@@ -217,13 +253,19 @@ public void testStructComplex() throws Exception {
217253
.addFields(3, test_bool)
218254
.addFields(4, test_double)
219255
.addFields(5, test_date)
220-
.addFields(6, ComplexLvl1)
221-
.addFields(7, ComplexLvl2)
222-
.addFields(8, TEST_NUMERIC)
223-
.addFields(9, TEST_GEO)
224-
.addFields(10, TEST_TIMESTAMP)
225-
.addFields(11, TEST_TIME)
226-
.addFields(12, TEST_NUMERIC_REPEATED)
256+
.addFields(6, test_datetime)
257+
.addFields(7, test_datetime_str)
258+
.addFields(8, ComplexLvl1)
259+
.addFields(9, ComplexLvl2)
260+
.addFields(10, TEST_NUMERIC)
261+
.addFields(11, TEST_GEO)
262+
.addFields(12, TEST_TIMESTAMP)
263+
.addFields(13, TEST_TIME)
264+
.addFields(14, TEST_TIME_STR)
265+
.addFields(15, TEST_NUMERIC_REPEATED)
266+
.addFields(16, TEST_NUMERIC_STR)
267+
.addFields(17, TEST_BIGNUMERIC)
268+
.addFields(18, TEST_BIGNUMERIC_STR)
227269
.build();
228270
final Descriptor descriptor =
229271
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);

0 commit comments

Comments
 (0)