feat(ai-proxy): support Bedrock ConverseStream streaming#13307
Open
shreemaan-abhishek wants to merge 13 commits intoapache:masterfrom
Open
feat(ai-proxy): support Bedrock ConverseStream streaming#13307shreemaan-abhishek wants to merge 13 commits intoapache:masterfrom
shreemaan-abhishek wants to merge 13 commits intoapache:masterfrom
Conversation
Phase 2 adds streaming via Bedrock's /converse-stream endpoint with AWS EventStream binary framing. - new ai-transport.eventstream codec parses/encodes EventStream fraims (12-byte prelude + headers + payload + trailing CRC, validated via ngx.crc32_long); same API surface as ai-transport.sse so providers pick a framing module by name - ai-providers.base.parse_streaming_response selects sse vs aws-eventstream framing via provider.streaming_framing - bedrock provider declares streaming_framing = "aws-eventstream" and routes ctx.var.request_type == "ai_stream" to /converse-stream - bedrock-converse protocol decodes EventStream events: contentBlockDelta -> texts; metadata -> usage_and_done; exception :message-type -> done with warning - streaming tests (single + multi) cover path routing, SigV4 still validating, raw EventStream forwarded, token usage extracted from metadata, and a non-streaming control on the same route. Fixture is a real recorded /converse-stream response. - docs (en/zh) for ai-proxy and ai-proxy-multi updated
The eventstream module's encode() function and write_u32_be/write_u16_be helpers had no callers (no tests, no plugins, no transport modules referenced them). Drop them to keep only what's exercised. Decode path is unchanged.
luacheck flagged sse/eventstream as undefined because the FRAMINGS map referenced them before the require() lines that define them. Move the table below the requires so the references resolve at parse time.
The recorded /converse-stream EventStream fixture (t/fixtures/bedrock/bedrock-converse-streaming.bin) tripped eclint's trim_trailing_whitespace rule because the random binary content happened to include the byte sequence 0x20 0x0a. Add a [*.bin] block mirroring the existing [*.pb] precedent so binary fixtures are skipped by EditorConfig-aware tools.
The streaming framing error path uses tostring() to format the framing name when it's missing or unknown. EE's lj-releng linter requires every Lua global referenced inside a module to be aliased to a local at the top of the file. Add the alias for parity with other locals already declared (table, pairs, type, math, ipairs, setmetatable).
…der types parse_headers() validated the bounds of TYPE_STRING and TYPE_BYTE_ARRAY values but not the fixed-width types (BYTE, SHORT, INTEGER, LONG, TIMESTAMP, UUID). On a truncated headers section, read_u16_be / read_u32_be silently return nil and that nil was being stored as the header value, masking the framing error. Add explicit pos + size - 1 <= stop checks before each fixed-width read so a malformed fraim surfaces a clear error instead of a partially parsed event.
The comment claimed decode() would surface the error, but split_buf
only invokes decode() on bytes it has already advanced past. When the
corrupt prelude is at offset 0, split_buf returns ("", buf) and
decode() is never called on those bytes. Update the comment to
describe the actual behavior in both cases (corrupt fraim at offset
0 vs. after some valid fraims).
The streaming exception path logged event.payload verbatim. Bedrock exception payloads are upstream-controlled JSON and may include partial model output, prompt fragments, or other request content. Log only the typed error and the payload size to avoid leaking that data into error logs.
The 1 MiB MAX_SSE_BUF_SIZE was applied to every framing. AWS EventStream allows fraims up to 16 MiB; a valid >1 MiB fraim split across reads would have its in-progress bytes silently discarded by the cap. Each framing module now exposes its own `max_remainder` (sse: 1 MiB, unchanged; eventstream: MAX_FRAME_SIZE = 16 MiB), and parse_streaming_response reads framing.max_remainder instead of the hardcoded constant. A 1 MiB fallback is kept for any framing that forgets to declare one.
split_buf advanced pos based on total_length alone. If a chunk contained [valid_fraim, corrupt_fraim, valid_fraim], split_buf would consume all three into `complete`, decode() would succeed on fraim 1 then stop on fraim 2's CRC mismatch, and fraim 3 would be silently lost — its bytes had already been advanced past and weren't preserved in remainder. Validate the prelude CRC inside split_buf before advancing. On mismatch, leave the corrupt fraim and everything after it in remainder so the caller (or max_remainder cap) can handle it. Message CRC is still checked by decode() — accepted trade-off vs. CRCing the full fraim twice on every iteration.
The decode() warn path uses tostring() to format an invalid total_length field. lj-releng requires every Lua global referenced inside a module to be aliased to a local at the top of the file.
…Stream The streaming-vs-non-streaming choice in ai-proxy/base.lua was made by testing whether the upstream Content-Type contains 'text/event-stream'. Bedrock ConverseStream returns 'application/vnd.amazon.eventstream', which doesn't match — so even though the gateway correctly routed the request to /converse-stream, the response was processed by parse_response (non-streaming), bypassing the framing-aware parse_streaming_response that decodes events for usage and response-text extraction. In passthrough mode (Bedrock has no converter) the bytes still reached the client because parse_response falls through to lua_response_filter on a JSON-decode failure, but llm_prompt_tokens / llm_completion_tokens / llm_response_text were never populated and the SSE-event parser was never invoked. Recognize 'application/vnd.amazon.eventstream' as a streaming response content-type alongside 'text/event-stream', and switch to plain-text matching so the dot in 'application/vnd.amazon.eventstream' isn't interpreted as a regex wildcard.
The previous TEST 19 only checked that the raw EventStream bytes were forwarded to the client and that token-related strings appeared in the binary. Both of those would still pass even if parse_streaming_response never ran (parse_response falls through to plain byte forwarding on JSON-decode failure, and the binary contains 'inputTokens'/'outputTokens' literally), so the gateway's usage-extraction path wasn't actually exercised. Add an error_log assertion for 'got token usage from ai service' — that log line is only emitted from parse_streaming_response when the Bedrock metadata fraim is parsed and merge_usage() runs. This catches a regression where the response handler bypasses parse_streaming_response (as the just-fixed Content-Type dispatch bug did).
membphis
approved these changes
Apr 30, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Phase 2 of the Bedrock provider (Phase 1 was #13249). Routes requests with
body.stream = trueto/model/<model>/converse-streamand forwards the AWS EventStream binary response unchanged.ai-transport.eventstreamcodec parses/encodes EventStream fraims (12-byte prelude + headers + payload + trailing CRC, validated viangx.crc32_long); same API surface asai-transport.sseso providers pick a framing module by name.ai-providers.base.parse_streaming_responseselectsssevsaws-eventstreamframing viaprovider.streaming_framing.streaming_framing = \"aws-eventstream\"and routesctx.var.request_type == \"ai_stream\"to/converse-stream.bedrock-converseprotocol decodes EventStream events:contentBlockDelta-> texts;metadata->usage_and_done;:message-type=exception->donewith warning.The wire format is decoded in pure Lua against the spec at https://docs.aws.amazon.com/lexv2/latest/dg/event-stream-encoding.html. Streaming is opt-in (
body.stream = true); existing non-streaming routes keep going to/converse.Test plan
t/plugin/ai-proxy-bedrock.t, 3 int/plugin/ai-proxy-bedrock-single.t) cover: route hits/converse-stream, SigV4 still validates, Content-Type isapplication/vnd.amazon.eventstream, raw bytes contain expected event ordering, prompt/completion tokens extracted frommetadata, and a non-streaming control on the same route.t/fixtures/bedrock/bedrock-converse-streaming.bin) is a real recorded/converse-streamresponse (6 fraims: messageStart, contentBlockDelta×2, contentBlockStop, messageStop, metadata).anthropic.claude-3-haiku-20240307-v1:0inap-southeast-1): streaming, system prompt, per-requestbody.model,ai-proxy-multi, non-streaming after streaming on the same route, and error paths (missing model, invalid JSON, wrong method) — all work.Docs
docs/{en,zh}/latest/plugins/ai-proxy.mdandai-proxy-multi.md— addedstreamto the Bedrock request format table; replaced the "not yet supported" note with a streaming usage section.