Content-Length: 646426 | pFad | https://github.com/googleapis/java-bigquerystorage/commit/b6b515f57a0f6956c9d9f902a5e3e16edc845a48

F1 feat: Add fully managed schema support on json writer (#1794) · googleapis/java-bigquerystorage@b6b515f · GitHub
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit b6b515f

Browse files
authored
feat: Add fully managed schema support on json writer (#1794)
b/247249766
1 parent 745ceb4 commit b6b515f

File tree

2 files changed

+183
-1
lines changed

2 files changed

+183
-1
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,21 @@ public static Builder newBuilder(
282282
return new Builder(streamOrTableName, tableSchema, client);
283283
}
284284

285+
/**
286+
* newBuilder that constructs a JsonStreamWriter builder with TableSchema being initialized by
287+
* StreamWriter by default.
288+
*
289+
* @param streamOrTableName name of the stream that must follow
290+
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
291+
* @param client BigQueryWriteClient
292+
* @return Builder
293+
*/
294+
public static Builder newBuilder(String streamOrTableName, BigQueryWriteClient client) {
295+
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
296+
Preconditions.checkNotNull(client, "BigQuery client is null.");
297+
return new Builder(streamOrTableName, null, client);
298+
}
299+
285300
/** Closes the underlying StreamWriter. */
286301
@Override
287302
public void close() {
@@ -330,8 +345,20 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite
330345
} else {
331346
this.streamName = streamOrTableName;
332347
}
333-
this.tableSchema = tableSchema;
334348
this.client = client;
349+
if (tableSchema == null) {
350+
GetWriteStreamRequest writeStreamRequest =
351+
GetWriteStreamRequest.newBuilder()
352+
.setName(this.getStreamName())
353+
.setView(WriteStreamView.FULL)
354+
.build();
355+
356+
WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);
357+
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
358+
this.tableSchema = writeStreamTableSchema;
359+
} else {
360+
this.tableSchema = tableSchema;
361+
}
335362
}
336363

337364
/**

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,161 @@ public void testJsonStreamWriterCommittedStream()
313313
}
314314
}
315315

316+
@Test
317+
public void testJsonStreamWriterWithDefaultSchema()
318+
throws IOException, InterruptedException, ExecutionException,
319+
Descriptors.DescriptorValidationException {
320+
String tableName = "JsonTableDefaultSchema";
321+
TableFieldSchema TEST_STRING =
322+
TableFieldSchema.newBuilder()
323+
.setType(TableFieldSchema.Type.STRING)
324+
.setMode(TableFieldSchema.Mode.NULLABLE)
325+
.setName("test_str")
326+
.build();
327+
TableFieldSchema TEST_NUMERIC =
328+
TableFieldSchema.newBuilder()
329+
.setType(TableFieldSchema.Type.NUMERIC)
330+
.setMode(TableFieldSchema.Mode.REPEATED)
331+
.setName("test_numerics")
332+
.build();
333+
TableFieldSchema TEST_DATE =
334+
TableFieldSchema.newBuilder()
335+
.setType(TableFieldSchema.Type.DATETIME)
336+
.setMode(TableFieldSchema.Mode.NULLABLE)
337+
.setName("test_datetime")
338+
.build();
339+
TableFieldSchema TEST_REPEATED_BYTESTRING =
340+
TableFieldSchema.newBuilder()
341+
.setType(TableFieldSchema.Type.BYTES)
342+
.setMode(TableFieldSchema.Mode.REPEATED)
343+
.setName("test_bytestring_repeated")
344+
.build();
345+
TableFieldSchema TEST_TIMESTAMP =
346+
TableFieldSchema.newBuilder()
347+
.setName("test_timeStamp")
348+
.setType(TableFieldSchema.Type.TIMESTAMP)
349+
.setMode(TableFieldSchema.Mode.NULLABLE)
350+
.build();
351+
TableSchema tableSchema =
352+
TableSchema.newBuilder()
353+
.addFields(0, TEST_STRING)
354+
.addFields(1, TEST_DATE)
355+
.addFields(2, TEST_NUMERIC)
356+
.addFields(3, TEST_REPEATED_BYTESTRING)
357+
.addFields(4, TEST_TIMESTAMP)
358+
.build();
359+
TableInfo tableInfo =
360+
TableInfo.newBuilder(
361+
TableId.of(DATASET, tableName),
362+
StandardTableDefinition.of(
363+
Schema.of(
364+
com.google.cloud.bigquery.Field.newBuilder(
365+
"test_str", StandardSQLTypeName.STRING)
366+
.build(),
367+
com.google.cloud.bigquery.Field.newBuilder(
368+
"test_numerics", StandardSQLTypeName.NUMERIC)
369+
.setMode(Field.Mode.REPEATED)
370+
.build(),
371+
com.google.cloud.bigquery.Field.newBuilder(
372+
"test_datetime", StandardSQLTypeName.DATETIME)
373+
.build(),
374+
com.google.cloud.bigquery.Field.newBuilder(
375+
"test_bytestring_repeated", StandardSQLTypeName.BYTES)
376+
.setMode(Field.Mode.REPEATED)
377+
.build(),
378+
com.google.cloud.bigquery.Field.newBuilder(
379+
"test_timestamp", StandardSQLTypeName.TIMESTAMP)
380+
.build())))
381+
.build();
382+
383+
bigquery.create(tableInfo);
384+
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
385+
try (JsonStreamWriter jsonStreamWriter =
386+
JsonStreamWriter.newBuilder(parent.toString(), client)
387+
.setIgnoreUnknownFields(true)
388+
.build()) {
389+
LOG.info("Sending one message");
390+
JSONObject row1 = new JSONObject();
391+
row1.put("test_str", "aaa");
392+
row1.put(
393+
"test_numerics",
394+
new JSONArray(
395+
new byte[][] {
396+
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("123.4"))
397+
.toByteArray(),
398+
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000"))
399+
.toByteArray()
400+
}));
401+
row1.put("unknown_field", "a");
402+
row1.put(
403+
"test_datetime",
404+
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0)));
405+
row1.put(
406+
"test_bytestring_repeated",
407+
new JSONArray(
408+
new byte[][] {
409+
ByteString.copyFromUtf8("a").toByteArray(),
410+
ByteString.copyFromUtf8("b").toByteArray()
411+
}));
412+
row1.put("test_timestamp", "2022-02-06 07:24:47.84");
413+
JSONArray jsonArr1 = new JSONArray(new JSONObject[] {row1});
414+
415+
ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr1, -1);
416+
417+
assertEquals(0, response1.get().getAppendResult().getOffset().getValue());
418+
419+
JSONObject row2 = new JSONObject();
420+
row1.put("test_str", "bbb");
421+
JSONObject row3 = new JSONObject();
422+
row2.put("test_str", "ccc");
423+
JSONArray jsonArr2 = new JSONArray();
424+
jsonArr2.put(row1);
425+
jsonArr2.put(row2);
426+
427+
JSONObject row4 = new JSONObject();
428+
row4.put("test_str", "ddd");
429+
JSONArray jsonArr3 = new JSONArray();
430+
jsonArr3.put(row4);
431+
432+
JSONObject row5 = new JSONObject();
433+
// Add another ARRAY<BYTES> using a more idiomatic way
434+
JSONArray testArr = new JSONArray(); // create empty JSONArray
435+
testArr.put(0, ByteString.copyFromUtf8("a").toByteArray()); // insert 1st bytes array
436+
testArr.put(1, ByteString.copyFromUtf8("b").toByteArray()); // insert 2nd bytes array
437+
row5.put("test_bytestring_repeated", testArr);
438+
JSONArray jsonArr4 = new JSONArray();
439+
jsonArr4.put(row5);
440+
441+
LOG.info("Sending three more messages");
442+
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, -1);
443+
LOG.info("Sending two more messages");
444+
ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(jsonArr3, -1);
445+
LOG.info("Sending one more message");
446+
ApiFuture<AppendRowsResponse> response4 = jsonStreamWriter.append(jsonArr4, -1);
447+
Assert.assertFalse(response2.get().getAppendResult().hasOffset());
448+
Assert.assertFalse(response3.get().getAppendResult().hasOffset());
449+
Assert.assertFalse(response4.get().getAppendResult().hasOffset());
450+
451+
TableResult result =
452+
bigquery.listTableData(
453+
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
454+
Iterator<FieldValueList> iter = result.getValues().iterator();
455+
FieldValueList currentRow = iter.next();
456+
assertEquals("aaa", currentRow.get(0).getStringValue());
457+
assertEquals("-9000000", currentRow.get(1).getRepeatedValue().get(1).getStringValue());
458+
assertEquals("2020-10-01T12:00:00", currentRow.get(2).getStringValue());
459+
assertEquals(2, currentRow.get(3).getRepeatedValue().size());
460+
assertEquals("Yg==", currentRow.get(3).getRepeatedValue().get(1).getStringValue());
461+
assertEquals("bbb", iter.next().get(0).getStringValue());
462+
assertEquals("ccc", iter.next().get(0).getStringValue());
463+
assertEquals("ddd", iter.next().get(0).getStringValue());
464+
FieldValueList currentRow2 = iter.next();
465+
assertEquals("YQ==", currentRow2.get(3).getRepeatedValue().get(0).getStringValue());
466+
assertEquals("Yg==", currentRow2.get(3).getRepeatedValue().get(1).getStringValue());
467+
assertEquals(false, iter.hasNext());
468+
}
469+
}
470+
316471
@Test
317472
public void testJsonStreamWriterWithDefaultStream()
318473
throws IOException, InterruptedException, ExecutionException,

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/java-bigquerystorage/commit/b6b515f57a0f6956c9d9f902a5e3e16edc845a48

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy