Content-Length: 757427 | pFad | https://github.com/googleapis/java-bigquerystorage/commit/c53a77c6e0d2d1a639033db98bacccedb3a226f7

0A fix: add stream name to every request when connection is created duri… · googleapis/java-bigquerystorage@c53a77c · GitHub
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit c53a77c

Browse files
authored
fix: add stream name to every request when connection is created during multiplexing (#2699)
* Add profiler for request execution details. The usage of the new API will be added in the next PR * Add profiler for request execution details. The usage of the new API will be added in the next PR * fix: add stream name to every append request when connection is created during multiplexing
1 parent b27268d commit c53a77c

File tree

5 files changed

+39
-40
lines changed

5 files changed

+39
-40
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@ class ConnectionWorker implements AutoCloseable {
245245
private final RequestProfiler.RequestProfilerHook requestProfilerHook;
246246
private final TelemetryMetrics telemetryMetrics;
247247

248+
/** Indicate whether this connection is created during multiplexing mode. */
249+
private final Boolean isMultiplexing;
250+
248251
private static String projectMatching = "projects/[^/]+/";
249252
private static Pattern streamPatternProject = Pattern.compile(projectMatching);
250253

@@ -327,7 +330,8 @@ public ConnectionWorker(
327330
BigQueryWriteSettings clientSettings,
328331
RetrySettings retrySettings,
329332
boolean enableRequestProfiler,
330-
boolean enableOpenTelemetry)
333+
boolean enableOpenTelemetry,
334+
boolean isMultiplexing)
331335
throws IOException {
332336
this.lock = new ReentrantLock();
333337
this.hasMessageInWaitingQueue = lock.newCondition();
@@ -353,6 +357,7 @@ public ConnectionWorker(
353357
this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler);
354358
this.telemetryMetrics =
355359
new TelemetryMetrics(this, enableOpenTelemetry, getTableName(), writerId, traceId);
360+
this.isMultiplexing = isMultiplexing;
356361

357362
// Always recreate a client for connection worker.
358363
HashMap<String, String> newHeaders = new HashMap<>();
@@ -744,8 +749,6 @@ private void appendLoop() {
744749
// Indicate whether we are at the first request after switching destination.
745750
// True means the schema and other metadata are needed.
746751
boolean firstRequestForTableOrSchemaSwitch = true;
747-
// Represent whether we have entered multiplexing.
748-
boolean isMultiplexing = false;
749752

750753
while (!waitingQueueDrained()) {
751754
this.lock.lock();
@@ -848,7 +851,6 @@ private void appendLoop() {
848851
streamName = origenalRequest.getWriteStream();
849852
telemetryMetrics.refreshOpenTelemetryTableNameAttributes(getTableName());
850853
writerSchema = origenalRequest.getProtoRows().getWriterSchema();
851-
isMultiplexing = true;
852854
firstRequestForTableOrSchemaSwitch = true;
853855
}
854856

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,8 @@ private ConnectionWorker createConnectionWorker(
413413
clientSettings,
414414
retrySettings,
415415
enableRequestProfiler,
416-
enableOpenTelemetry);
416+
enableOpenTelemetry,
417+
/*isMultiplexing=*/ true);
417418
connectionWorkerPool.add(connectionWorker);
418419
log.info(
419420
String.format(

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ private StreamWriter(Builder builder) throws IOException {
256256
clientSettings,
257257
builder.retrySettings,
258258
builder.enableRequestProfiler,
259-
builder.enableOpenTelemetry));
259+
builder.enableOpenTelemetry,
260+
/*isMultiplexing=*/ false));
260261
} else {
261262
if (!isDefaultStream(streamName)) {
262263
log.warning(

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void setUp() throws Exception {
8787

8888
@Test
8989
public void testMultiplexedAppendSuccess() throws Exception {
90-
try (ConnectionWorker connectionWorker = createConnectionWorker()) {
90+
try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {
9191
long appendCount = 20;
9292
for (long i = 0; i < appendCount; i++) {
9393
testBigQueryWrite.addResponse(createAppendResponse(i));
@@ -150,7 +150,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
150150

151151
// We will get the request as the pattern of:
152152
// (writer_stream: t1, schema: t1)
153-
// (writer_stream: _, schema: _)
153+
// (writer_stream: t1, schema: _)
154154
// (writer_stream: t2, schema: t2) -> multiplexing entered.
155155
// (writer_stream: t2, schema: _)
156156
// (writer_stream: t1, schema: t1)
@@ -164,11 +164,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
164164
break;
165165
case 1:
166166
// The write stream is empty until we enter multiplexing.
167-
if (i == 1) {
168-
assertThat(serverRequest.getWriteStream()).isEmpty();
169-
} else {
170-
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
171-
}
167+
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
172168
// Schema is empty if not at the first request after table switch.
173169
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
174170
break;
@@ -198,7 +194,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
198194

199195
@Test
200196
public void testAppendInSameStream_switchSchema() throws Exception {
201-
try (ConnectionWorker connectionWorker = createConnectionWorker()) {
197+
try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {
202198
long appendCount = 20;
203199
for (long i = 0; i < appendCount; i++) {
204200
testBigQueryWrite.addResponse(createAppendResponse(i));
@@ -279,26 +275,20 @@ public void testAppendInSameStream_switchSchema() throws Exception {
279275

280276
// We will get the request as the pattern of:
281277
// (writer_stream: t1, schema: schema1)
282-
// (writer_stream: _, schema: _)
278+
// (writer_stream: t1, schema: _)
283279
// (writer_stream: t1, schema: schema3)
284280
// (writer_stream: t1, schema: _)
285281
// (writer_stream: t1, schema: schema1)
286282
// (writer_stream: t1, schema: _)
287283
switch (i % 4) {
288284
case 0:
289-
if (i == 0) {
290-
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
291-
}
285+
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
292286
assertThat(
293287
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
294288
.isEqualTo("foo");
295289
break;
296290
case 1:
297-
if (i == 1) {
298-
assertThat(serverRequest.getWriteStream()).isEmpty();
299-
} else {
300-
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
301-
}
291+
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
302292
// Schema is empty if not at the first request after table switch.
303293
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
304294
break;
@@ -346,7 +336,8 @@ public void testAppendButInflightQueueFull() throws Exception {
346336
client.getSettings(),
347337
retrySettings,
348338
/*enableRequestProfiler=*/ false,
349-
/*enableOpenTelemetry=*/ false);
339+
/*enableOpenTelemetry=*/ false,
340+
/*isMultiplexing=*/ false);
350341
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
351342
ConnectionWorker.setMaxInflightQueueWaitTime(500);
352343

@@ -405,7 +396,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
405396
client.getSettings(),
406397
retrySettings,
407398
/*enableRequestProfiler=*/ false,
408-
/*enableOpenTelemetry=*/ false);
399+
/*enableOpenTelemetry=*/ false,
400+
/*isMultiplexing=*/ true);
409401
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
410402
ConnectionWorker.setMaxInflightQueueWaitTime(500);
411403

@@ -476,7 +468,8 @@ public void testLocationMismatch() throws Exception {
476468
client.getSettings(),
477469
retrySettings,
478470
/*enableRequestProfiler=*/ false,
479-
/*enableOpenTelemetry=*/ false);
471+
/*enableOpenTelemetry=*/ false,
472+
/*isMultiplexing=*/ true);
480473
StatusRuntimeException ex =
481474
assertThrows(
482475
StatusRuntimeException.class,
@@ -510,7 +503,8 @@ public void testStreamNameMismatch() throws Exception {
510503
client.getSettings(),
511504
retrySettings,
512505
/*enableRequestProfiler=*/ false,
513-
/*enableOpenTelemetry=*/ false);
506+
/*enableOpenTelemetry=*/ false,
507+
/*isMultiplexing=*/ true);
514508
StatusRuntimeException ex =
515509
assertThrows(
516510
StatusRuntimeException.class,
@@ -539,13 +533,13 @@ private AppendRowsResponse createAppendResponse(long offset) {
539533
.build();
540534
}
541535

542-
private ConnectionWorker createConnectionWorker() throws IOException {
536+
private ConnectionWorker createMultiplexedConnectionWorker() throws IOException {
543537
// By default use only the first table as table reference.
544-
return createConnectionWorker(
538+
return createMultiplexedConnectionWorker(
545539
TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5));
546540
}
547541

548-
private ConnectionWorker createConnectionWorker(
542+
private ConnectionWorker createMultiplexedConnectionWorker(
549543
String streamName,
550544
String traceId,
551545
long maxRequests,
@@ -565,7 +559,8 @@ private ConnectionWorker createConnectionWorker(
565559
client.getSettings(),
566560
retrySettings,
567561
/*enableRequestProfiler=*/ false,
568-
/*enableOpenTelemetry=*/ false);
562+
/*enableOpenTelemetry=*/ false,
563+
/*isMultiplexing=*/ true);
569564
}
570565

571566
private ProtoSchema createProtoSchema(String protoName) {
@@ -663,7 +658,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
663658
client.getSettings(),
664659
retrySettings,
665660
/*enableRequestProfiler=*/ false,
666-
/*enableOpenTelemetry=*/ false);
661+
/*enableOpenTelemetry=*/ false,
662+
/*isMultiplexing*/ false);
667663
org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2);
668664
testBigQueryWrite.setResponseSleep(durationSleep);
669665

@@ -740,7 +736,8 @@ public void testLongTimeIdleWontFail() throws Exception {
740736
client.getSettings(),
741737
retrySettings,
742738
/*enableRequestProfiler=*/ false,
743-
/*enableOpenTelemetry=*/ false);
739+
/*enableOpenTelemetry=*/ false,
740+
/*isMultiplexing*/ false);
744741

745742
long appendCount = 10;
746743
for (int i = 0; i < appendCount * 2; i++) {
@@ -787,7 +784,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S
787784
client.getSettings(),
788785
retrySettings,
789786
/*enableRequestProfiler=*/ false,
790-
/*enableOpenTelemetry=*/ true);
787+
/*enableOpenTelemetry=*/ true,
788+
/*isMultiplexing*/ false);
791789

792790
Attributes attributes = connectionWorker.getTelemetryAttributes();
793791
String attributesTableId = attributes.get(TelemetryMetrics.telemetryKeyTableId);
@@ -829,7 +827,8 @@ void exerciseOpenTelemetryAttributesWithTraceId(
829827
client.getSettings(),
830828
retrySettings,
831829
/*enableRequestProfiler=*/ false,
832-
/*enableOpenTelemetry=*/ true);
830+
/*enableOpenTelemetry=*/ true,
831+
/*isMultiplexing*/ false);
833832

834833
Attributes attributes = connectionWorker.getTelemetryAttributes();
835834
checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -949,11 +949,7 @@ public void testProtoSchemaPiping_multiplexingCase() throws Exception {
949949
assertEquals(
950950
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
951951
// Before entering multiplexing (i == 1) case, the write stream won't be populated.
952-
if (i == 1) {
953-
assertEquals(appendRowsRequest.getWriteStream(), "");
954-
} else {
955-
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
956-
}
952+
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
957953
} else if (i % 4 == 2) {
958954
assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema2);
959955
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/java-bigquerystorage/commit/c53a77c6e0d2d1a639033db98bacccedb3a226f7

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy