Back to Ray

Serve a Chatbot with Request and Response Streaming

doc/source/serve/tutorials/streaming.md

1.13.110.5 KB
Original Source

(serve-streaming-tutorial)=

Serve a Chatbot with Request and Response Streaming

This example deploys a chatbot that streams output back to the user. It shows:

  • How to stream outputs from a Serve application
  • How to use WebSockets in a Serve application
  • How to combine batching requests with streaming outputs

This tutorial should help you with following use cases:

  • You want to serve a large language model and stream results back token-by-token.
  • You want to serve a chatbot that accepts a stream of inputs from the user.

This tutorial serves the DialoGPT language model. Install the Hugging Face library to access it:

pip install "ray[serve]" transformers torch

Create a streaming deployment

Open a new Python file called textbot.py. First, add the imports and the Serve logger.

{literalinclude}
:language: python
:start-after: __textbot_setup_start__
:end-before: __textbot_setup_end__

Create a FastAPI deployment, and initialize the model and the tokenizer in the constructor:

{literalinclude}
:language: python
:start-after: __textbot_constructor_start__
:end-before: __textbot_constructor_end__

Note that the constructor also caches an asyncio loop. This behavior is useful when you need to run a model and concurrently stream its tokens back to the user.

Add the following logic to handle requests sent to the Textbot:

{literalinclude}
:language: python
:start-after: __textbot_logic_start__
:end-before: __textbot_logic_end__

Textbot uses three methods to handle requests:

  • handle_request: the entrypoint for HTTP requests. FastAPI automatically unpacks the prompt query parameter and passes it into handle_request. This method then creates a TextIteratorStreamer. Hugging Face provides this streamer as a convenient interface to access tokens generated by a language model. handle_request then kicks off the model in a background thread using self.loop.run_in_executor. This behavior lets the model generate tokens while handle_request concurrently calls self.consume_streamer to stream the tokens back to the user. self.consume_streamer is a generator that yields tokens one by one from the streamer. Lastly, handle_request passes the self.consume_streamer generator into a Starlette StreamingResponse and returns the response. Serve unpacks the Starlette StreamingResponse and yields the contents of the generator back to the user one by one.
  • generate_text: the method that runs the model. This method runs in a background thread kicked off by handle_request. It pushes generated tokens into the streamer constructed by handle_request.
  • consume_streamer: a generator method that consumes the streamer constructed by handle_request. This method keeps yielding tokens from the streamer until the model in generate_text closes the streamer. This method avoids blocking the event loop by calling asyncio.sleep with a brief timeout whenever the streamer is empty and waiting for a new token.

Bind the Textbot to a language model. For this tutorial, use the "microsoft/DialoGPT-small" model:

{literalinclude}
:language: python
:start-after: __textbot_bind_start__
:end-before: __textbot_bind_end__

Run the model with serve run textbot:app, and query it from another terminal window with this script:

{literalinclude}
:language: python
:start-after: __stream_client_start__
:end-before: __stream_client_end__

You should see the output printed token by token.

Stream inputs and outputs using WebSockets

WebSockets let you stream input into the application and stream output back to the client. Use WebSockets to create a chatbot that stores a conversation with a user.

Create a Python file called chatbot.py. First add the imports:

{literalinclude}
:language: python
:start-after: __chatbot_setup_start__
:end-before: __chatbot_setup_end__

Create a FastAPI deployment, and initialize the model and the tokenizer in the constructor:

{literalinclude}
:language: python
:start-after: __chatbot_constructor_start__
:end-before: __chatbot_constructor_end__

Add the following logic to handle requests sent to the Chatbot:

{literalinclude}
:language: python
:start-after: __chatbot_logic_start__
:end-before: __chatbot_logic_end__

The generate_text and consume_streamer methods are the same as they were for the Textbot. The handle_request method has been updated to handle WebSocket requests.

The handle_request method is decorated with a fastapi_app.websocket decorator, which lets it accept WebSocket requests. First it awaits to accept the client's WebSocket request. Then, until the client disconnects, it does the following:

  • gets the prompt from the client with ws.receive_text
  • starts a new TextIteratorStreamer to access generated tokens
  • runs the model in a background thread on the conversation so far
  • streams the model's output back using ws.send_text
  • stores the prompt and the response in the conversation string

Each time handle_request gets a new prompt from a client, it runs the whole conversation–with the new prompt appended–through the model. When the model finishes generating tokens, handle_request sends the "<<Response Finished>>" string to inform the client that the model has generated all tokens. handle_request continues to run until the client explicitly disconnects. This disconnect raises a WebSocketDisconnect exception, which ends the call.

Read more about WebSockets in the FastAPI documentation.

Bind the Chatbot to a language model. For this tutorial, use the "microsoft/DialoGPT-small" model:

{literalinclude}
:language: python
:start-after: __chatbot_bind_start__
:end-before: __chatbot_bind_end__

Run the model with serve run chatbot:app. Query it using the websockets package, using pip install websockets:

{literalinclude}
:language: python
:start-after: __ws_client_start__
:end-before: __ws_client_end__

You should see the outputs printed token by token.

Batch requests and stream the output for each

Improve model utilization and request latency by batching requests together when running the model.

Create a Python file called batchbot.py. First add the imports:

{literalinclude}
:language: python
:start-after: __batchbot_setup_start__
:end-before: __batchbot_setup_end__

:::{warning} Hugging Face's support for Streamers is still under development and may change in the future. RawQueue is compatible with the Streamers interface in Hugging Face 4.30.2. However, the Streamers interface may change, making the RawQueue incompatible with Hugging Face models in the future. :::

Similar to Textbot and Chatbot, the Batchbot needs a streamer to stream outputs from batched requests, but Hugging Face Streamers don't support batched requests. Add this custom RawStreamer to process batches of tokens:

{literalinclude}
:language: python
:start-after: __raw_streamer_start__
:end-before: __raw_streamer_end__

Create a FastAPI deployment, and initialize the model and the tokenizer in the constructor:

{literalinclude}
:language: python
:start-after: __batchbot_constructor_start__
:end-before: __batchbot_constructor_end__

Unlike Textbot and Chatbot, the Batchbot constructor also sets a pad_token. You need to set this token to batch prompts with different lengths.

Add the following logic to handle requests sent to the Batchbot:

{literalinclude}
:language: python
:start-after: __batchbot_logic_start__
:end-before: __batchbot_logic_end__

Batchbot uses four methods to handle requests:

  • handle_request: the entrypoint method. This method simply takes in the request's prompt and calls the run_model method on it. run_model is a generator method that also handles batching the requests. handle_request passes run_model into a Starlette StreamingResponse and returns the response, so the bot can stream generated tokens back to the client.
  • run_model: a generator method that performs batching. Since run_model is decorated with @serve.batch, it automatically takes in a batch of prompts. See the batching guide for more info. run_model creates a RawStreamer to access the generated tokens. It calls generate_text in a background thread, and passes in the prompts and the streamer, similar to the Textbot. Then it iterates through the consume_streamer generator, repeatedly yielding a batch of tokens generated by the model.
  • generate_text: the method that runs the model. It's mostly the same as generate_text in Textbot, with two differences. First, it takes in and processes a batch of prompts instead of a single prompt. Second, it sets padding=True, so prompts with different lengths can be batched together.
  • consume_streamer: a generator method that consumes the streamer constructed by handle_request. It's mostly the same as consume_streamer in Textbot, with one difference. It uses the tokenizer to decode the generated tokens. Usually, the Hugging Face streamer handles the decoding. Because this implementation uses the custom RawStreamer, consume_streamer must handle the decoding.

:::{tip} Some inputs within a batch may generate fewer outputs than others. When a particular input has nothing left to yield, pass a StopIteration object into the output iterable to terminate that input's request. See Streaming batched requests for more details. :::

Bind the Batchbot to a language model. For this tutorial, use the "microsoft/DialoGPT-small" model:

{literalinclude}
:language: python
:start-after: __batchbot_bind_start__
:end-before: __batchbot_bind_end__

Run the model with serve run batchbot:app. Query it from two other terminal windows with this script:

{literalinclude}
:language: python
:start-after: __stream_client_start__
:end-before: __stream_client_end__

You should see the output printed token by token in both windows.