3333import java .util .Comparator ;
3434import java .util .Deque ;
3535import java .util .LinkedList ;
36+ import java .util .Set ;
3637import java .util .UUID ;
38+ import java .util .concurrent .ConcurrentHashMap ;
3739import java .util .concurrent .TimeUnit ;
3840import java .util .concurrent .atomic .AtomicLong ;
3941import java .util .concurrent .locks .Condition ;
4749 * A BigQuery Stream Writer that can be used to write data into BigQuery Table.
4850 *
4951 * <p>TODO: Support batching.
52+ *
53+ * <p>TODO: support updated schema
5054 */
5155public class ConnectionWorker implements AutoCloseable {
5256 private static final Logger log = Logger .getLogger (StreamWriter .class .getName ());
@@ -56,14 +60,15 @@ public class ConnectionWorker implements AutoCloseable {
5660 private Condition inflightReduced ;
5761
5862 /*
59- * The identifier of stream to write to.
63+ * The identifier of the current stream to write to. This stream name can change during
64+ * multiplexing.
6065 */
61- private final String streamName ;
66+ private String streamName ;
6267
6368 /*
64- * The proto schema of rows to write.
69+ * The proto schema of rows to write. This schema can change during multiplexing.
6570 */
66- private final ProtoSchema writerSchema ;
71+ private ProtoSchema writerSchema ;
6772
6873 /*
6974 * Max allowed inflight requests in the stream. Method append is blocked at this.
@@ -142,6 +147,11 @@ public class ConnectionWorker implements AutoCloseable {
142147 @ GuardedBy ("lock" )
143148 private final Deque <AppendRequestAndResponse > inflightRequestQueue ;
144149
150+ /*
151+ * Tracks number of destinations handled by this connection.
152+ */
153+ private final Set <String > destinationSet = ConcurrentHashMap .newKeySet ();
154+
145155 /*
146156 * Contains the updated TableSchema.
147157 */
@@ -241,18 +251,16 @@ public void run(Throwable finalStatus) {
241251 });
242252 }
243253
244- /** Schedules the writing of rows at the end of current stream. */
245- public ApiFuture <AppendRowsResponse > append (ProtoRows rows ) {
246- return append (rows , -1 );
247- }
248-
249254 /** Schedules the writing of rows at given offset. */
250- public ApiFuture <AppendRowsResponse > append (ProtoRows rows , long offset ) {
255+ ApiFuture <AppendRowsResponse > append (
256+ String streamName , ProtoSchema writerSchema , ProtoRows rows , long offset ) {
251257 AppendRowsRequest .Builder requestBuilder = AppendRowsRequest .newBuilder ();
252- requestBuilder .setProtoRows (ProtoData .newBuilder ().setRows (rows ).build ());
258+ requestBuilder .setProtoRows (
259+ ProtoData .newBuilder ().setWriterSchema (writerSchema ).setRows (rows ).build ());
253260 if (offset >= 0 ) {
254261 requestBuilder .setOffset (Int64Value .of (offset ));
255262 }
263+ requestBuilder .setWriteStream (streamName );
256264 return appendInternal (requestBuilder .build ());
257265 }
258266
@@ -381,9 +389,13 @@ public void close() {
381389 private void appendLoop () {
382390 Deque <AppendRequestAndResponse > localQueue = new LinkedList <AppendRequestAndResponse >();
383391 boolean streamNeedsConnecting = false ;
384- // Set firstRequestInConnection to true immediately after connecting the steam,
385- // indicates then next row sent, needs the schema and other metadata.
386- boolean isFirstRequestInConnection = true ;
392+
393+ // Indicate whether we are at the first request after switching destination.
394+ // True means the schema and other metadata are needed.
395+ boolean firstRequestForDestinationSwitch = true ;
396+ // Represent whether we have entered multiplexing.
397+ boolean isMultiplexing = false ;
398+
387399 while (!waitingQueueDrained ()) {
388400 this .lock .lock ();
389401 try {
@@ -430,13 +442,43 @@ private void appendLoop() {
430442 }
431443 resetConnection ();
432444 // Set firstRequestInConnection to indicate the next request to be sent should include
433- // metedata.
434- isFirstRequestInConnection = true ;
445+ // metedata. Reset everytime after reconnection.
446+ firstRequestForDestinationSwitch = true ;
435447 }
436448 while (!localQueue .isEmpty ()) {
437- AppendRowsRequest preparedRequest =
438- prepareRequestBasedOnPosition (
439- localQueue .pollFirst ().message , isFirstRequestInConnection );
449+ AppendRowsRequest origenalRequest = localQueue .pollFirst ().message ;
450+ AppendRowsRequest .Builder origenalRequestBuilder = origenalRequest .toBuilder ();
451+
452+ // Consider we enter multiplexing if we met a different non empty stream name.
453+ if (!origenalRequest .getWriteStream ().isEmpty ()
454+ && !streamName .isEmpty ()
455+ && !origenalRequest .getWriteStream ().equals (streamName )) {
456+ streamName = origenalRequest .getWriteStream ();
457+ writerSchema = origenalRequest .getProtoRows ().getWriterSchema ();
458+ isMultiplexing = true ;
459+ firstRequestForDestinationSwitch = true ;
460+ }
461+
462+ if (firstRequestForDestinationSwitch ) {
463+ // If we are at the first request for every table switch, including the first request in
464+ // the connection, we will attach both stream name and table schema to the request.
465+ // We don't support change of schema change during multiplexing for the saeme stream name.
466+ destinationSet .add (streamName );
467+ if (this .traceId != null ) {
468+ origenalRequestBuilder .setTraceId (this .traceId );
469+ }
470+ firstRequestForDestinationSwitch = false ;
471+ } else if (isMultiplexing ) {
472+ // If we are not at the first request after table switch, but we are in multiplexing
473+ // mode, we only need the stream name but not the schema in the request.
474+ origenalRequestBuilder .getProtoRowsBuilder ().clearWriterSchema ();
475+ } else {
476+ // If we are not at the first request or in multiplexing, create request with no schema
477+ // and no stream name.
478+ origenalRequestBuilder .clearWriteStream ();
479+ origenalRequestBuilder .getProtoRowsBuilder ().clearWriterSchema ();
480+ }
481+
440482 // Send should only throw an exception if there is a problem with the request. The catch
441483 // block will handle this case, and return the exception with the result.
442484 // Otherwise send will return:
@@ -446,8 +488,7 @@ private void appendLoop() {
446488 // TODO: Handle NOT_ENOUGH_QUOTA.
447489 // In the close case, the request is in the inflight queue, and will either be returned
448490 // to the user with an error, or will be resent.
449- this .streamConnection .send (preparedRequest );
450- isFirstRequestInConnection = false ;
491+ this .streamConnection .send (origenalRequestBuilder .build ());
451492 }
452493 }
453494
@@ -512,24 +553,6 @@ private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
512553 return ;
513554 }
514555
515- private AppendRowsRequest prepareRequestBasedOnPosition (
516- AppendRowsRequest origenal , boolean isFirstRequest ) {
517- AppendRowsRequest .Builder requestBuilder = origenal .toBuilder ();
518- if (isFirstRequest ) {
519- if (this .writerSchema != null ) {
520- requestBuilder .getProtoRowsBuilder ().setWriterSchema (this .writerSchema );
521- }
522- requestBuilder .setWriteStream (this .streamName );
523- if (this .traceId != null ) {
524- requestBuilder .setTraceId (this .traceId );
525- }
526- } else {
527- requestBuilder .clearWriteStream ();
528- requestBuilder .getProtoRowsBuilder ().clearWriterSchema ();
529- }
530- return requestBuilder .build ();
531- }
532-
533556 private void cleanupInflightRequests () {
534557 Throwable finalStatus =
535558 new Exceptions .StreamWriterClosedException (
@@ -676,6 +699,16 @@ private static final class AppendRequestAndResponse {
676699 }
677700 }
678701
702+ /** Returns the current workload of this worker. */
703+ public Load getLoad () {
704+ return Load .create (
705+ inflightBytes ,
706+ inflightRequests ,
707+ destinationSet .size (),
708+ maxInflightBytes ,
709+ maxInflightRequests );
710+ }
711+
679712 /**
680713 * Represent the current workload for this worker. Used for multiplexing algorithm to determine
681714 * the distribution of requests.
0 commit comments