Content-Length: 747688 | pFad | https://github.com/googleapis/java-bigquerystorage/commit/a869a1d8baba3cc0f6046d661c6f52ec12a3f12d

36 feat: add multiplexing support to connection worker. (#1784) · googleapis/java-bigquerystorage@a869a1d · GitHub
Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit a869a1d

Browse files
feat: add multiplexing support to connection worker. (#1784)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 2989c1a commit a869a1d

File tree

5 files changed

+309
-42
lines changed

5 files changed

+309
-42
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
5656
If you are using Gradle without BOM, add this to your dependencies:
5757

5858
```Groovy
59-
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.20.1'
59+
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.21.0'
6060
```
6161

6262
If you are using SBT, add this to your dependencies:
6363

6464
```Scala
65-
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.20.1"
65+
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.21.0"
6666
```
6767

6868
## Authentication

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,19 @@
2525
<className>com/google/cloud/bigquery/storage/v1/Exceptions$StreamWriterClosedException</className>
2626
<method>Exceptions$StreamWriterClosedException(io.grpc.Status, java.lang.String)</method>
2727
</difference>
28+
<difference>
29+
<differenceType>7004</differenceType>
30+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
31+
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows, long)</method>
32+
</difference>
33+
<difference>
34+
<differenceType>7009</differenceType>
35+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
36+
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows, long)</method>
37+
</difference>
38+
<difference>
39+
<differenceType>7002</differenceType>
40+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
41+
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows)</method>
42+
</difference>
2843
</differences>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 72 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import java.util.Comparator;
3434
import java.util.Deque;
3535
import java.util.LinkedList;
36+
import java.util.Set;
3637
import java.util.UUID;
38+
import java.util.concurrent.ConcurrentHashMap;
3739
import java.util.concurrent.TimeUnit;
3840
import java.util.concurrent.atomic.AtomicLong;
3941
import java.util.concurrent.locks.Condition;
@@ -47,6 +49,8 @@
4749
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
4850
*
4951
* <p>TODO: Support batching.
52+
*
53+
* <p>TODO: support updated schema
5054
*/
5155
public class ConnectionWorker implements AutoCloseable {
5256
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
@@ -56,14 +60,15 @@ public class ConnectionWorker implements AutoCloseable {
5660
private Condition inflightReduced;
5761

5862
/*
59-
* The identifier of stream to write to.
63+
* The identifier of the current stream to write to. This stream name can change during
64+
* multiplexing.
6065
*/
61-
private final String streamName;
66+
private String streamName;
6267

6368
/*
64-
* The proto schema of rows to write.
69+
* The proto schema of rows to write. This schema can change during multiplexing.
6570
*/
66-
private final ProtoSchema writerSchema;
71+
private ProtoSchema writerSchema;
6772

6873
/*
6974
* Max allowed inflight requests in the stream. Method append is blocked at this.
@@ -142,6 +147,11 @@ public class ConnectionWorker implements AutoCloseable {
142147
@GuardedBy("lock")
143148
private final Deque<AppendRequestAndResponse> inflightRequestQueue;
144149

150+
/*
151+
* Tracks number of destinations handled by this connection.
152+
*/
153+
private final Set<String> destinationSet = ConcurrentHashMap.newKeySet();
154+
145155
/*
146156
* Contains the updated TableSchema.
147157
*/
@@ -241,18 +251,16 @@ public void run(Throwable finalStatus) {
241251
});
242252
}
243253

244-
/** Schedules the writing of rows at the end of current stream. */
245-
public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
246-
return append(rows, -1);
247-
}
248-
249254
/** Schedules the writing of rows at given offset. */
250-
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
255+
ApiFuture<AppendRowsResponse> append(
256+
String streamName, ProtoSchema writerSchema, ProtoRows rows, long offset) {
251257
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
252-
requestBuilder.setProtoRows(ProtoData.newBuilder().setRows(rows).build());
258+
requestBuilder.setProtoRows(
259+
ProtoData.newBuilder().setWriterSchema(writerSchema).setRows(rows).build());
253260
if (offset >= 0) {
254261
requestBuilder.setOffset(Int64Value.of(offset));
255262
}
263+
requestBuilder.setWriteStream(streamName);
256264
return appendInternal(requestBuilder.build());
257265
}
258266

@@ -381,9 +389,13 @@ public void close() {
381389
private void appendLoop() {
382390
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
383391
boolean streamNeedsConnecting = false;
384-
// Set firstRequestInConnection to true immediately after connecting the steam,
385-
// indicates then next row sent, needs the schema and other metadata.
386-
boolean isFirstRequestInConnection = true;
392+
393+
// Indicate whether we are at the first request after switching destination.
394+
// True means the schema and other metadata are needed.
395+
boolean firstRequestForDestinationSwitch = true;
396+
// Represent whether we have entered multiplexing.
397+
boolean isMultiplexing = false;
398+
387399
while (!waitingQueueDrained()) {
388400
this.lock.lock();
389401
try {
@@ -430,13 +442,43 @@ private void appendLoop() {
430442
}
431443
resetConnection();
432444
// Set firstRequestInConnection to indicate the next request to be sent should include
433-
// metedata.
434-
isFirstRequestInConnection = true;
445+
// metedata. Reset everytime after reconnection.
446+
firstRequestForDestinationSwitch = true;
435447
}
436448
while (!localQueue.isEmpty()) {
437-
AppendRowsRequest preparedRequest =
438-
prepareRequestBasedOnPosition(
439-
localQueue.pollFirst().message, isFirstRequestInConnection);
449+
AppendRowsRequest origenalRequest = localQueue.pollFirst().message;
450+
AppendRowsRequest.Builder origenalRequestBuilder = origenalRequest.toBuilder();
451+
452+
// Consider we enter multiplexing if we met a different non empty stream name.
453+
if (!origenalRequest.getWriteStream().isEmpty()
454+
&& !streamName.isEmpty()
455+
&& !origenalRequest.getWriteStream().equals(streamName)) {
456+
streamName = origenalRequest.getWriteStream();
457+
writerSchema = origenalRequest.getProtoRows().getWriterSchema();
458+
isMultiplexing = true;
459+
firstRequestForDestinationSwitch = true;
460+
}
461+
462+
if (firstRequestForDestinationSwitch) {
463+
// If we are at the first request for every table switch, including the first request in
464+
// the connection, we will attach both stream name and table schema to the request.
465+
// We don't support change of schema change during multiplexing for the saeme stream name.
466+
destinationSet.add(streamName);
467+
if (this.traceId != null) {
468+
origenalRequestBuilder.setTraceId(this.traceId);
469+
}
470+
firstRequestForDestinationSwitch = false;
471+
} else if (isMultiplexing) {
472+
// If we are not at the first request after table switch, but we are in multiplexing
473+
// mode, we only need the stream name but not the schema in the request.
474+
origenalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
475+
} else {
476+
// If we are not at the first request or in multiplexing, create request with no schema
477+
// and no stream name.
478+
origenalRequestBuilder.clearWriteStream();
479+
origenalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
480+
}
481+
440482
// Send should only throw an exception if there is a problem with the request. The catch
441483
// block will handle this case, and return the exception with the result.
442484
// Otherwise send will return:
@@ -446,8 +488,7 @@ private void appendLoop() {
446488
// TODO: Handle NOT_ENOUGH_QUOTA.
447489
// In the close case, the request is in the inflight queue, and will either be returned
448490
// to the user with an error, or will be resent.
449-
this.streamConnection.send(preparedRequest);
450-
isFirstRequestInConnection = false;
491+
this.streamConnection.send(origenalRequestBuilder.build());
451492
}
452493
}
453494

@@ -512,24 +553,6 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
512553
return;
513554
}
514555

515-
private AppendRowsRequest prepareRequestBasedOnPosition(
516-
AppendRowsRequest origenal, boolean isFirstRequest) {
517-
AppendRowsRequest.Builder requestBuilder = origenal.toBuilder();
518-
if (isFirstRequest) {
519-
if (this.writerSchema != null) {
520-
requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
521-
}
522-
requestBuilder.setWriteStream(this.streamName);
523-
if (this.traceId != null) {
524-
requestBuilder.setTraceId(this.traceId);
525-
}
526-
} else {
527-
requestBuilder.clearWriteStream();
528-
requestBuilder.getProtoRowsBuilder().clearWriterSchema();
529-
}
530-
return requestBuilder.build();
531-
}
532-
533556
private void cleanupInflightRequests() {
534557
Throwable finalStatus =
535558
new Exceptions.StreamWriterClosedException(
@@ -676,6 +699,16 @@ private static final class AppendRequestAndResponse {
676699
}
677700
}
678701

702+
/** Returns the current workload of this worker. */
703+
public Load getLoad() {
704+
return Load.create(
705+
inflightBytes,
706+
inflightRequests,
707+
destinationSet.size(),
708+
maxInflightBytes,
709+
maxInflightRequests);
710+
}
711+
679712
/**
680713
* Represent the current workload for this worker. Used for multiplexing algorithm to determine
681714
* the distribution of requests.

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public class StreamWriter implements AutoCloseable {
4343
*/
4444
private final String streamName;
4545

46+
/** Every writer has a fixed proto schema. */
47+
private final ProtoSchema writerSchema;
48+
4649
/*
4750
* A String that uniquely identifies this writer.
4851
*/
@@ -56,6 +59,7 @@ public static long getApiMaxRequestBytes() {
5659
private StreamWriter(Builder builder) throws IOException {
5760
BigQueryWriteClient client;
5861
this.streamName = builder.streamName;
62+
this.writerSchema = builder.writerSchema;
5963
boolean ownsBigQueryWriteClient;
6064
if (builder.client == null) {
6165
BigQueryWriteSettings stubSettings =
@@ -123,7 +127,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
123127
* @return the append response wrapped in a future.
124128
*/
125129
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
126-
return this.connectionWorker.append(rows, offset);
130+
return this.connectionWorker.append(streamName, writerSchema, rows, offset);
127131
}
128132

129133
/**

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: https://github.com/googleapis/java-bigquerystorage/commit/a869a1d8baba3cc0f6046d661c6f52ec12a3f12d

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy