Streaming Messages from Temporal Workers to SSE Clients

gk11 pts0 comments

Streaming Messages from Temporal Workers to SSE Clients | Architecting Bytes

Streaming Messages from Temporal Workers to SSE Clients

Combining Temporal and Redis PubSub to stream messages to clients

Temporal recently put out a wonderful demo of<br>using Temporal to make prod-ready OpenAI agents,<br>and I&rsquo;ve seen many around asking a particular question about Temporal itself:

How do I stream responses from Temporal?

It&rsquo;s a natural question, given how often SSE responses are used with LLMs. There are a few ways to approach this<br>problem, but I will focus on a singular solution. I implemented my own Worker(s) -> SSE stream implementation about a<br>year ago to handle fan-out notifications, and will share it with you here.<br>I do not prescribe that this is the ideal solution, instead, this is meant to tease out some creative solutions from<br>you, the reader.

The Goals

We want to solve a few things:

Make a Temporal workflow that can &ldquo;stream&rdquo; data as it works through a problem of some kind

It should work across multiple workers

It should allow more than one &ldquo;receiver&rdquo;

Make a simple API

The Plan

We&rsquo;ll use a few components to achieve this:

FastAPI for our API

Temporal&rsquo;s Python SDK for our workers

Redis, for pub-sub streaming

I mention in my Temporal talk that I prefer diagramming<br>workflows as timelines; this time is no different!<br>I always find it helpful to break down a Temporal flow as a timeline.

Let’s Build

To keep things relatively simple, we are not going to incorporate the OpenAI SDK in this demo, rather, we are going to<br>simulate it. This workflow will be a contrived example of getting text back from a distributed workflow run, as the<br>workflow is running . If you&rsquo;d like more detail about using the OpenAI SDK with Temporal, please check<br>out the blog I linked above.

Our LLM Simulation

LLMs are great at talking a LOT. To simulate that, we&rsquo;re going to<br>use Faker with a generator. This gives our example here a clean abstraction<br>and allows for anything with similar behavior (I.E., non-deterministic text generation) to be dropped right in. In fact,<br>after checking out this post, head over<br>to Dallas Young&rsquo;s example repo.

async def conversation_generator() -> AsyncGenerator[str, None]:<br>fake = Faker()<br>for _ in range(10):<br>await asyncio.sleep(1) # simulate a slow LLM<br>yield fake.sentence()

Nothing too surprising going on here, it&rsquo;s just a simple async generator that pops out random sentences from Faker.

The Activities

Stepping one level up the implementation layers, we examine our<br>stream-capable activities:

class MyActivities:<br>def __init__(self, redis_client: Redis):<br>self.redis_client = redis_client

@activity.defn<br>async def stream_it(self, params: tuple[str, str]) -> None:<br>channel_id, message = params<br>await self.redis_client.publish(channel_id, message)

@activity.defn<br>async def converse(self, params: tuple[datetime, datetime]) -> str:<br>stop_after, now = params

if now stop_after:<br>async for sentence in conversation_generator():<br>return sentence

return "Done yapping!"

First, we define this activity as a class with methods, instead of standalone functions, simply to share the Redis<br>client. If one so desired, one could have functions do this as well, perhaps with recreating the Redis client each time,<br>or putting the client in global scope. Personally, I don&rsquo;t like either of those solutions, so we have this class.

There are two activities here:

converse, the piece that actually gets sentences from our fake LLM

Note how the timing info is included in the params, and not a concern of the activity function itself. This is to<br>keep our activity from blowing<br>up Temporal&rsquo;s Python SDK sandbox behavior.<br>datetime.now(UTC) is considered non-deterministic, so instead we need to<br>use workflow.now, which we invoke from our workflow<br>scope, not our activity scope.

stream_it, the piece that sends some data off to Redis PubSub

The Workflow

Next we look at the workflow that ties the activities together (another step up<br>the layers)!

@workflow.defn<br>class TalkativeWorkflow:<br>"""A very talkative workflow!"""

@workflow.run<br>async def run(self, channel_id: str) -> str:<br>stop_yapping_time = workflow.now() + timedelta(seconds=15)

while True:<br>now = workflow.now()

result = await workflow.execute_activity(<br>"converse",<br>(stop_yapping_time, now),<br>start_to_close_timeout=timedelta(seconds=60),

await workflow.execute_activity(<br>"stream_it",<br>(channel_id, result),<br>start_to_close_timeout=timedelta(seconds=60),

if result == "Done yapping!":<br>break

return "Bye!"

This workflow enters a loop where it creates one &ldquo;converse&rdquo; and one &ldquo;stream&rdquo; activity per loop, until it receives a<br>terminal message from the source of conversation. In this case, the terminal message is &ldquo;Done yapping!&rdquo;

It&rsquo;s important to point out, I am merely using a loop here as part of the LLM simulation. When interacting with a real<br>LLM, or some other source...

workflow temporal from rsquo activity redis

Related Articles