(serve-streaming-tutorial)= # Streaming Tutorial This guide walks you through deploying a chatbot that streams output back to the user. It shows: * How to stream outputs from a Serve application * How to use WebSockets in a Serve application * How to combine batching requests with streaming outputs This tutorial should help you with following use cases: * You want to serve a large language model and stream results back token-by-token. * You want to serve a chatbot that accepts a stream of inputs from the user. This tutorial serves the [DialoGPT](https://huggingface.co/microsoft/DialoGPT-small) language model. Install the HuggingFace library to access it: ``` pip install transformers ``` # Create a Streaming Deployment Open a new Python file called `textbot.py`. First, add the imports and the [Serve logger](serve-logging). ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __textbot_setup_start__ :end-before: __textbot_setup_end__ ``` Create a [FastAPI deployment](serve-fastapi-http), and initialize the model and the tokenizer in the constructor: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __textbot_constructor_start__ :end-before: __textbot_constructor_end__ ``` Note that the constructor also caches an `asyncio` loop. This behavior is useful when you need to run a model and concurrently stream its tokens back to the user. Add the following logic to handle requests sent to the `Textbot`: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __textbot_logic_start__ :end-before: __textbot_logic_end__ ``` `Textbot` uses three methods to handle requests: * `handle_request`: the entrypoint for HTTP requests. FastAPI automatically unpacks the `prompt` query parameter and passes it into `handle_request`. This method then creates a `TextIteratorStreamer`. HuggingFace provides this streamer as a convenient interface to access tokens generated by a language model. `handle_request` then kicks off the model in a background thread using `self.loop.run_in_executor`. This behavior lets the model generate tokens while `handle_request` concurrently calls `self.consume_streamer` to stream the tokens back to the user. `self.consume_streamer` is a generator that yields tokens one by one from the streamer. Lastly, `handle_request` passes the `self.consume_streamer` generator into a Starlette `StreamingResponse` and returns the response. Serve unpacks the Starlette `StreamingResponse` and yields the contents of the generator back to the user one by one. * `generate_text`: the method that runs the model. This method runs in a background thread kicked off by `handle_request`. It pushes generated tokens into the streamer constructed by `handle_request`. * `consume_streamer`: a generator method that consumes the streamer constructed by `handle_request`. This method keeps yielding tokens from the streamer until the model in `generate_text` closes the streamer. This method avoids blocking the event loop by calling `asyncio.sleep` with a brief timeout whenever the streamer is empty and waiting for a new token. Bind the `Textbot` to a language model. For this tutorial, use the `"microsoft/DialoGPT-small"` model: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __textbot_bind_start__ :end-before: __textbot_bind_end__ ``` Run the model with `serve run textbot:app`, and query it from another terminal window with this script: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __stream_client_start__ :end-before: __stream_client_end__ ``` You should see the output printed token by token. # Stream inputs and outputs using WebSockets WebSockets let you stream input into the application and stream output back to the client. Use WebSockets to create a chatbot that stores a conversation with a user. Create a Python file called `chatbot.py`. First add the imports: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __chatbot_setup_start__ :end-before: __chatbot_setup_end__ ``` Create a FastAPI deployment, and initialize the model and the tokenizer in the constructor: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __chatbot_constructor_start__ :end-before: __chatbot_constructor_end__ ``` Add the following logic to handle requests sent to the `Chatbot`: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __chatbot_logic_start__ :end-before: __chatbot_logic_end__ ``` The `generate_text` and `consume_streamer` methods are the same as they were for the `Textbot`. The `handle_request` method has been updated to handle WebSocket requests. The `handle_request` method is decorated with a `fastapi_app.websocket` decorator, which lets it accept WebSocket requests. First it `awaits` to accept the client's WebSocket request. Then, until the client disconnects, it does the following: * gets the prompt from the client with `ws.receive_text` * starts a new `TextIteratorStreamer` to access generated tokens * runs the model in a background thread on the conversation so far * streams the model's output back using `ws.send_text` * stores the prompt and the response in the `conversation` string Each time `handle_request` gets a new prompt from a client, it runs the whole conversation– with the new prompt appended– through the model. When the model is finished generating tokens, `handle_request` sends the `"<>"` string to inform the client that all tokens have been generated. `handle_request` continues to run until the client explicitly disconnects. This disconnect raises a `WebSocketDisconnect` exception, which ends the call. Read more about WebSockets in the [FastAPI documentation](https://fastapi.tiangolo.com/advanced/websockets/). Bind the `Chatbot` to a language model. For this tutorial, use the `"microsoft/DialoGPT-small"` model: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __chatbot_bind_start__ :end-before: __chatbot_bind_end__ ``` Run the model with `serve run chatbot:app`. Query it using the `websockets` package (`pip install websockets`): ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __ws_client_start__ :end-before: __ws_client_end__ ``` You should see the outputs printed token by token. # Batch requests and stream the output for each Improve model utilization and request latency by batching requests together when running the model. Create a Python file called `batchbot.py`. First add the imports: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __batchbot_setup_start__ :end-before: __batchbot_setup_end__ ``` :::{warning} HuggingFace's support for `Streamers` is still under development and may change in the future. `RawQueue` is compatible with the `Streamers` interface in HuggingFace 4.30.2. However, the `Streamers` interface may change, making the `RawQueue` incompatible with HuggingFace models in the future. ::: Just like the `Textbot` and `Chatbot`, the `Batchbot` needs a streamer to stream outputs from batched requests, but HuggingFace `Streamers` don't support batched requests yet. Add this custom `RawStreamer` to process batches of tokens: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __raw_streamer_start__ :end-before: __raw_streamer_end__ ``` Create a FastAPI deployment, and initialize the model and the tokenizer in the constructor: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __batchbot_constructor_start__ :end-before: __batchbot_constructor_end__ ``` Unlike `Textbot` and `Chatbot`, the `Batchbot` constructor also sets a `pad_token`. This token needs to be set to batch prompts with different lengths. Add the following logic to handle requests sent to the `Batchbot`: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __batchbot_logic_start__ :end-before: __batchbot_logic_end__ ``` `Batchbot` uses four methods to handle requests: * `handle_request`: the entrypoint method. This method simply takes in the request's prompt and calls the `run_model` method on it. `run_model` is a generator method that also handles batching the requests. `handle_request` passes `run_model` into a Starlette `StreamingResponse` and returns the response, so generated tokens can be streamed back to the client. * `run_model`: a generator method that performs batching. Since `run_model` is decorated with `@serve.batch`, it automatically takes in a batch of prompts. See the [batching guide](serve-batch-tutorial) for more info. `run_model` creates a `RawStreamer` to access the generated tokens. It calls `generate_text` in a background thread, and passes in the `prompts` and the `streamer`, similar to the `Textbot`. Then it iterates through the `consume_streamer` generator, repeatedly yielding a batch of tokens generated by the model. * `generate_text`: the method that runs the model. It's mostly the same as `generate_text` in `Textbot`, with two differences. First, it takes in and processes a batch of prompts instead of a single prompt. Second, it sets `padding=True`, so prompts with different lengths can be batched together. * `consume_streamer`: a generator method that consumes the streamer constructed by `handle_request`. It's mostly the same as `consume_streamer` in `Textbot`, with one difference. It uses the `tokenizer` to decode the generated tokens. Usually, this is handled by the HuggingFace streamer. Since this implementation uses the custom `RawStreamer`, `consume_streamer` must handle the decoding. :::{tip} Some inputs within a batch may generate fewer outputs than others. When a particular input has nothing left to yield, pass a `StopIteration` object into the output iterable to terminate that input's request. See [this section](serve-streaming-batched-requests-guide) for more info. ::: Bind the `Batchbot` to a language model. For this tutorial, use the `"microsoft/DialoGPT-small"` model: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __batchbot_bind_start__ :end-before: __batchbot_bind_end__ ``` Run the model with `serve run batchbot:app`. Query it from two other terminal windows with this script: ```{literalinclude} ../doc_code/streaming_tutorial.py :language: python :start-after: __stream_client_start__ :end-before: __stream_client_end__ ``` You should see the output printed token by token in both windows.