Content-Length: 561240 | pFad | https://github.com/googleapis/java-bigquerystorage/commit/320f5fc6a2a180e361f1a5a375095a65ec62003f

27 fix: add extra JsonWriterTest to show that the LimitBehavior addition… · googleapis/java-bigquerystorage@320f5fc · GitHub
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 320f5fc

Browse files
authored
fix: add extra JsonWriterTest to show that the LimitBehavior addition is not breaking (#1643)
* fix[1539] * . * . * fix: Add an extra jsonWriterTest for Limit Behavior * . * . * . * . * fix an issue that we should reject request before it is added to the queue. * . * .
1 parent 77f44d8 commit 320f5fc

File tree

4 files changed

+72
-33
lines changed

4 files changed

+72
-33
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,6 @@ private JsonStreamWriter(Builder builder)
7878
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
7979
this.totalMessageSize = protoSchema.getSerializedSize();
8080
streamWriterBuilder.setWriterSchema(protoSchema);
81-
if (builder.flowControlSettings != null) {
82-
streamWriterBuilder.setLimitExceededBehavior(
83-
builder.flowControlSettings.getLimitExceededBehavior());
84-
}
8581
setStreamWriterSettings(
8682
builder.channelProvider,
8783
builder.credentialsProvider,

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,16 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
314314
.withDescription("Connection is already closed")));
315315
return requestWrapper.appendResult;
316316
}
317+
// Check if queue is going to be full before adding the request.
318+
if ((this.inflightRequests + 1 >= this.maxInflightRequests
319+
|| this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes)
320+
&& (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException)) {
321+
throw new StatusRuntimeException(
322+
Status.fromCode(Code.RESOURCE_EXHAUSTED)
323+
.withDescription(
324+
"Exceeds client side inflight buffer, consider add more buffer or open more connections."));
325+
}
326+
317327
if (connectionFinalStatus != null) {
318328
requestWrapper.appendResult.setException(
319329
new StatusRuntimeException(
@@ -339,29 +349,18 @@ private void maybeWaitForInflightQuota() {
339349
long start_time = System.currentTimeMillis();
340350
while (this.inflightRequests >= this.maxInflightRequests
341351
|| this.inflightBytes >= this.maxInflightBytes) {
342-
if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
343-
throw new StatusRuntimeException(
344-
Status.fromCode(Code.RESOURCE_EXHAUSTED)
345-
.withDescription(
346-
"Exceeds client side inflight buffer, consider add more buffer or open more connections."));
347-
} else if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) {
352+
try {
353+
inflightReduced.await(100, TimeUnit.MILLISECONDS);
354+
} catch (InterruptedException e) {
355+
log.warning(
356+
"Interrupted while waiting for inflight quota. Stream: "
357+
+ streamName
358+
+ " Error: "
359+
+ e.toString());
348360
throw new StatusRuntimeException(
349-
Status.fromCode(Code.INVALID_ARGUMENT)
350-
.withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
351-
} else {
352-
try {
353-
inflightReduced.await(100, TimeUnit.MILLISECONDS);
354-
} catch (InterruptedException e) {
355-
log.warning(
356-
"Interrupted while waiting for inflight quota. Stream: "
357-
+ streamName
358-
+ " Error: "
359-
+ e.toString());
360-
throw new StatusRuntimeException(
361-
Status.fromCode(Code.CANCELLED)
362-
.withCause(e)
363-
.withDescription("Interrupted while waiting for quota."));
364-
}
361+
Status.fromCode(Code.CANCELLED)
362+
.withCause(e)
363+
.withDescription("Interrupted while waiting for quota."));
365364
}
366365
}
367366
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
@@ -812,7 +811,12 @@ public Builder setTraceId(String traceId) {
812811
* @return
813812
*/
814813
public Builder setLimitExceededBehavior(
815-
FlowController.LimitExceededBehavior limitExceededBehavior) {
814+
FlowController.LimitExceededBehavior limitExceededBehavior) throws StatusRuntimeException {
815+
if (limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) {
816+
throw new StatusRuntimeException(
817+
Status.fromCode(Code.INVALID_ARGUMENT)
818+
.withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
819+
}
816820
this.limitExceededBehavior = limitExceededBehavior;
817821
return this;
818822
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,4 +581,26 @@ public void run() throws Throwable {
581581
"Exceeds client side inflight buffer, consider add more buffer or open more connections"));
582582
}
583583
}
584+
585+
// This is to test the new addition didn't break previous settings, i.e., sets the inflight limit
586+
// without limit beahvior.
587+
@Test
588+
public void testFlowControlSettingNoLimitBehavior() throws Exception {
589+
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
590+
try (JsonStreamWriter writer =
591+
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
592+
.setChannelProvider(channelProvider)
593+
.setCredentialsProvider(NoCredentialsProvider.create())
594+
.setFlowControlSettings(
595+
FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(1L).build())
596+
.build()) {
597+
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
598+
JSONObject foo = new JSONObject();
599+
foo.put("test_int", 10);
600+
JSONArray jsonArr = new JSONArray();
601+
jsonArr.put(foo);
602+
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
603+
appendFuture.get();
604+
}
605+
}
584606
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -552,12 +552,6 @@ public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
552552
.setMaxInflightBytes(1)
553553
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
554554
.build();
555-
// Server will sleep 100ms before every response.
556-
testBigQueryWrite.setResponseSleep(Duration.ofMillis(100));
557-
long appendCount = 10;
558-
for (int i = 0; i < appendCount; i++) {
559-
testBigQueryWrite.addResponse(createAppendResponse(i));
560-
}
561555
StatusRuntimeException ex =
562556
assertThrows(
563557
StatusRuntimeException.class,
@@ -577,6 +571,29 @@ public void run() throws Throwable {
577571
writer.close();
578572
}
579573

574+
@Test
575+
public void testLimitBehaviorIgnoreNotAccepted() throws Exception {
576+
StatusRuntimeException ex =
577+
assertThrows(
578+
StatusRuntimeException.class,
579+
new ThrowingRunnable() {
580+
@Override
581+
public void run() throws Throwable {
582+
StreamWriter writer =
583+
StreamWriter.newBuilder(TEST_STREAM, client)
584+
.setWriterSchema(createProtoSchema())
585+
.setMaxInflightBytes(1)
586+
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
587+
.build();
588+
}
589+
});
590+
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
591+
assertTrue(
592+
ex.getStatus()
593+
.getDescription()
594+
.contains("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
595+
}
596+
580597
@Test
581598
public void testMessageTooLarge() throws Exception {
582599
StreamWriter writer = getTestStreamWriter();

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/java-bigquerystorage/commit/320f5fc6a2a180e361f1a5a375095a65ec62003f

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy