@@ -87,7 +87,7 @@ public void setUp() throws Exception {
8787
8888 @ Test
8989 public void testMultiplexedAppendSuccess () throws Exception {
90- try (ConnectionWorker connectionWorker = createConnectionWorker ()) {
90+ try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker ()) {
9191 long appendCount = 20 ;
9292 for (long i = 0 ; i < appendCount ; i ++) {
9393 testBigQueryWrite .addResponse (createAppendResponse (i ));
@@ -150,7 +150,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
150150
151151 // We will get the request as the pattern of:
152152 // (writer_stream: t1, schema: t1)
153- // (writer_stream: _ , schema: _)
153+ // (writer_stream: t1 , schema: _)
154154 // (writer_stream: t2, schema: t2) -> multiplexing entered.
155155 // (writer_stream: t2, schema: _)
156156 // (writer_stream: t1, schema: t1)
@@ -164,11 +164,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
164164 break ;
165165 case 1 :
166166 // The write stream is empty until we enter multiplexing.
167- if (i == 1 ) {
168- assertThat (serverRequest .getWriteStream ()).isEmpty ();
169- } else {
170- assertThat (serverRequest .getWriteStream ()).isEqualTo (TEST_STREAM_1 );
171- }
167+ assertThat (serverRequest .getWriteStream ()).isEqualTo (TEST_STREAM_1 );
172168 // Schema is empty if not at the first request after table switch.
173169 assertThat (serverRequest .getProtoRows ().hasWriterSchema ()).isFalse ();
174170 break ;
@@ -198,7 +194,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
198194
199195 @ Test
200196 public void testAppendInSameStream_switchSchema () throws Exception {
201- try (ConnectionWorker connectionWorker = createConnectionWorker ()) {
197+ try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker ()) {
202198 long appendCount = 20 ;
203199 for (long i = 0 ; i < appendCount ; i ++) {
204200 testBigQueryWrite .addResponse (createAppendResponse (i ));
@@ -279,26 +275,20 @@ public void testAppendInSameStream_switchSchema() throws Exception {
279275
280276 // We will get the request as the pattern of:
281277 // (writer_stream: t1, schema: schema1)
282- // (writer_stream: _ , schema: _)
278+ // (writer_stream: t1 , schema: _)
283279 // (writer_stream: t1, schema: schema3)
284280 // (writer_stream: t1, schema: _)
285281 // (writer_stream: t1, schema: schema1)
286282 // (writer_stream: t1, schema: _)
287283 switch (i % 4 ) {
288284 case 0 :
289- if (i == 0 ) {
290- assertThat (serverRequest .getWriteStream ()).isEqualTo (TEST_STREAM_1 );
291- }
285+ assertThat (serverRequest .getWriteStream ()).isEqualTo (TEST_STREAM_1 );
292286 assertThat (
293287 serverRequest .getProtoRows ().getWriterSchema ().getProtoDescriptor ().getName ())
294288 .isEqualTo ("foo" );
295289 break ;
296290 case 1 :
297- if (i == 1 ) {
298- assertThat (serverRequest .getWriteStream ()).isEmpty ();
299- } else {
300- assertThat (serverRequest .getWriteStream ()).isEqualTo (TEST_STREAM_1 );
301- }
291+ assertThat (serverRequest .getWriteStream ()).isEqualTo (TEST_STREAM_1 );
302292 // Schema is empty if not at the first request after table switch.
303293 assertThat (serverRequest .getProtoRows ().hasWriterSchema ()).isFalse ();
304294 break ;
@@ -346,7 +336,8 @@ public void testAppendButInflightQueueFull() throws Exception {
346336 client .getSettings (),
347337 retrySettings ,
348338 /*enableRequestProfiler=*/ false ,
349- /*enableOpenTelemetry=*/ false );
339+ /*enableOpenTelemetry=*/ false ,
340+ /*isMultiplexing=*/ false );
350341 testBigQueryWrite .setResponseSleep (org .threeten .bp .Duration .ofSeconds (1 ));
351342 ConnectionWorker .setMaxInflightQueueWaitTime (500 );
352343
@@ -405,7 +396,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
405396 client .getSettings (),
406397 retrySettings ,
407398 /*enableRequestProfiler=*/ false ,
408- /*enableOpenTelemetry=*/ false );
399+ /*enableOpenTelemetry=*/ false ,
400+ /*isMultiplexing=*/ true );
409401 testBigQueryWrite .setResponseSleep (org .threeten .bp .Duration .ofSeconds (1 ));
410402 ConnectionWorker .setMaxInflightQueueWaitTime (500 );
411403
@@ -476,7 +468,8 @@ public void testLocationMismatch() throws Exception {
476468 client .getSettings (),
477469 retrySettings ,
478470 /*enableRequestProfiler=*/ false ,
479- /*enableOpenTelemetry=*/ false );
471+ /*enableOpenTelemetry=*/ false ,
472+ /*isMultiplexing=*/ true );
480473 StatusRuntimeException ex =
481474 assertThrows (
482475 StatusRuntimeException .class ,
@@ -510,7 +503,8 @@ public void testStreamNameMismatch() throws Exception {
510503 client .getSettings (),
511504 retrySettings ,
512505 /*enableRequestProfiler=*/ false ,
513- /*enableOpenTelemetry=*/ false );
506+ /*enableOpenTelemetry=*/ false ,
507+ /*isMultiplexing=*/ true );
514508 StatusRuntimeException ex =
515509 assertThrows (
516510 StatusRuntimeException .class ,
@@ -539,13 +533,13 @@ private AppendRowsResponse createAppendResponse(long offset) {
539533 .build ();
540534 }
541535
542- private ConnectionWorker createConnectionWorker () throws IOException {
536+ private ConnectionWorker createMultiplexedConnectionWorker () throws IOException {
543537 // By default use only the first table as table reference.
544- return createConnectionWorker (
538+ return createMultiplexedConnectionWorker (
545539 TEST_STREAM_1 , TEST_TRACE_ID , 100 , 1000 , java .time .Duration .ofSeconds (5 ));
546540 }
547541
548- private ConnectionWorker createConnectionWorker (
542+ private ConnectionWorker createMultiplexedConnectionWorker (
549543 String streamName ,
550544 String traceId ,
551545 long maxRequests ,
@@ -565,7 +559,8 @@ private ConnectionWorker createConnectionWorker(
565559 client .getSettings (),
566560 retrySettings ,
567561 /*enableRequestProfiler=*/ false ,
568- /*enableOpenTelemetry=*/ false );
562+ /*enableOpenTelemetry=*/ false ,
563+ /*isMultiplexing=*/ true );
569564 }
570565
571566 private ProtoSchema createProtoSchema (String protoName ) {
@@ -663,7 +658,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
663658 client .getSettings (),
664659 retrySettings ,
665660 /*enableRequestProfiler=*/ false ,
666- /*enableOpenTelemetry=*/ false );
661+ /*enableOpenTelemetry=*/ false ,
662+ /*isMultiplexing*/ false );
667663 org .threeten .bp .Duration durationSleep = org .threeten .bp .Duration .ofSeconds (2 );
668664 testBigQueryWrite .setResponseSleep (durationSleep );
669665
@@ -740,7 +736,8 @@ public void testLongTimeIdleWontFail() throws Exception {
740736 client .getSettings (),
741737 retrySettings ,
742738 /*enableRequestProfiler=*/ false ,
743- /*enableOpenTelemetry=*/ false );
739+ /*enableOpenTelemetry=*/ false ,
740+ /*isMultiplexing*/ false );
744741
745742 long appendCount = 10 ;
746743 for (int i = 0 ; i < appendCount * 2 ; i ++) {
@@ -787,7 +784,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S
787784 client .getSettings (),
788785 retrySettings ,
789786 /*enableRequestProfiler=*/ false ,
790- /*enableOpenTelemetry=*/ true );
787+ /*enableOpenTelemetry=*/ true ,
788+ /*isMultiplexing*/ false );
791789
792790 Attributes attributes = connectionWorker .getTelemetryAttributes ();
793791 String attributesTableId = attributes .get (TelemetryMetrics .telemetryKeyTableId );
@@ -829,7 +827,8 @@ void exerciseOpenTelemetryAttributesWithTraceId(
829827 client .getSettings (),
830828 retrySettings ,
831829 /*enableRequestProfiler=*/ false ,
832- /*enableOpenTelemetry=*/ true );
830+ /*enableOpenTelemetry=*/ true ,
831+ /*isMultiplexing*/ false );
833832
834833 Attributes attributes = connectionWorker .getTelemetryAttributes ();
835834 checkOpenTelemetryTraceIdAttribute (attributes , 0 , expectedField1 );
0 commit comments