(openai responses): add websocket connection pool#4985
Conversation
| self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( | ||
| connect_cb=self._create_ws_conn, | ||
| close_cb=self._close_ws, | ||
| max_session_duration=3600, |
There was a problem hiding this comment.
But we can't reuse the same WS connection for different conversations, right? Even with store=True, the server has to rehydrate the chat history if the ws connection expects a different previous_response_id.
There was a problem hiding this comment.
I could have worded my comment better, but each websocket connection is independent of previous_response_id
There was a problem hiding this comment.
But the server-side in-memory cache for each connection seems very dependent:
On an active WebSocket connection, the service keeps one previous-response state in a connection-local in-memory cache (the most recent response). Continuing from that most recent response is fast because the service can reuse connection-local state. Because the previous-response state is retained only in memory and is not written to disk, you can use WebSocket mode in a way that is compatible with store=false and Zero Data Retention (ZDR).
If a previous_response_id is not in the in-memory cache, behavior depends on whether you store responses:
With store=true, the service may hydrate older response IDs from persisted state when available. Continuation can still work, but it usually loses the in-memory latency benefit.
With store=false (including ZDR), there is no persisted fallback. If the ID is uncached, the request returns previous_response_not_found.
Do we see previous_response_not_found when store=false if you reuse the same connection for two conversations?
There was a problem hiding this comment.
Ah we don't use previous_response_id when store=False, we just send the entire context in that case (relevant line)
chenghao-mou
left a comment
There was a problem hiding this comment.
mostly lgtm. Tested it locally and it worked well.
livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/responses/llm.py
Outdated
Show resolved
Hide resolved
livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/responses/llm.py
Outdated
Show resolved
Hide resolved
| if isinstance(o, openai.BaseModel): | ||
| return o.model_dump() | ||
| raise TypeError(f"unexpected type {type(o)}") | ||
| async def send_request(self, msg: dict) -> AsyncGenerator[dict, None]: |
There was a problem hiding this comment.
nit: the name seems misleading in a sense that it sends the request but also receives the responses. Maybe process_request?
There was a problem hiding this comment.
how about generate_response?
livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/responses/llm.py
Outdated
Show resolved
Hide resolved
livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/responses/llm.py
Outdated
Show resolved
Hide resolved
livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/responses/llm.py
Show resolved
Hide resolved
livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/responses/llm.py
Outdated
Show resolved
Hide resolved
chenghao-mou
left a comment
There was a problem hiding this comment.
lgtm. One nit about APIStatusError construction.
| f"OpenAI Responses WebSocket closed: {close_reason}", | ||
| status_code=close_code, | ||
| retryable=False, | ||
| ) |
There was a problem hiding this comment.
raw_msg has both .data and .extra we can leverage:
APIStatusError(
"AssemblyAI connection closed unexpectedly",
status_code=ws.close_code or -1,
body=f"{msg.data=} {msg.extra=}",
)
when there are two parallel streams, restart and send the full context on the next request
each request will use its own websocket connection (edit: and each WS connection is independent of response IDs)