FastStream 0.7: MQTT support – in-memory tests, AsyncAPI generation and more

Lancetnik1 pts0 comments

Release v0.7.0 · ag2ai/faststream · GitHub

//releases/show" data-turbo-transient="true" />

Skip to content

Search or jump to...

Search code, repositories, users, issues, pull requests...

-->

Search

Clear

Search syntax tips

Provide feedback

--><br>We read every piece of feedback, and take your input very seriously.

Include my email address so I can be contacted

Cancel

Submit feedback

Saved searches

Use saved searches to filter your results more quickly

-->

Name

Query

To see all available qualifiers, see our documentation.

Cancel

Create saved search

Sign in

//releases/show;ref_cta:Sign up;ref_loc:header logged out"}"<br>Sign up

Appearance settings

Resetting focus

You signed in with another tab or window. Reload to refresh your session.<br>You signed out in another tab or window. Reload to refresh your session.<br>You switched accounts on another tab or window. Reload to refresh your session.

Dismiss alert

{{ message }}

ag2ai

faststream

Public

Notifications<br>You must be signed in to change notification settings

Fork<br>356

Star<br>5.2k

v0.7.0

Latest

Latest

Compare

Choose a tag to compare

Sorry, something went wrong.

Filter

Loading

Sorry, something went wrong.

Uh oh!

There was an error while loading. Please reload this page.

No results found

View all tags

Lancetnik

released this

01 Jun 20:42

0.7.0

7cb9390

This commit was created on GitHub.com and signed with GitHub’s verified signature .

GPG key ID: B5690EEEBB952194

Verified

Learn about vigilant mode.

What's Changed

🚀 MQTT Support

FastStream now includes a full-featured MQTT broker, installable via pip install faststream[mqtt]. It supports wildcard topic filters, path parameter capture via Path(), QoS levels, per-subscriber ack_policy, and AsyncAPI schema generation.

None:<br>print(device_id, body)

@app.after_startup<br>async def publish_demo() -> None:<br>await broker.publish(21.5, "sensors/room1/temperature", qos=QoS.AT_LEAST_ONCE)">from faststream import FastStream, Path<br>from faststream.mqtt import MQTTBroker, MQTTMessage, QoS

broker = MQTTBroker("localhost:1883")<br>app = FastStream(broker)

@broker.subscriber(<br>"sensors/{device_id}/temperature",<br>qos=QoS.AT_LEAST_ONCE,<br>async def on_temperature(body: str, device_id: Annotated[str, Path()]) -> None:<br>print(device_id, body)

@app.after_startup<br>async def publish_demo() -> None:<br>await broker.publish(21.5, "sensors/room1/temperature", qos=QoS.AT_LEAST_ONCE)

🔀 Multi-broker Support

A single FastStream application can now run multiple brokers at the same time. Pass all the brokers directly to the FastStream constructor — each keeps its own subscribers and publishers, and the app starts and stops all of them together. A common use case is bridging two systems: consume from one broker and re-publish to another.

str:<br># Bridge the message from Kafka to NATS<br>return msg

@nats_broker.subscriber("outgoing")<br>async def from_nats(msg: str) -> None:<br>print(f"Received from NATS: {msg}")">from faststream import FastStream<br>from faststream.kafka import KafkaBroker<br>from faststream.nats import NatsBroker

kafka_broker = KafkaBroker("localhost:9092")<br>nats_broker = NatsBroker("nats://localhost:4222")

app = FastStream(kafka_broker, nats_broker)

@kafka_broker.subscriber("incoming")<br>@nats_broker.publisher("outgoing")<br>async def from_kafka(msg: str) -> str:<br># Bridge the message from Kafka to NATS<br>return msg

@nats_broker.subscriber("outgoing")<br>async def from_nats(msg: str) -> None:<br>print(f"Received from NATS: {msg}")

🗄️ Redis Cluster Support

FastStream's Redis broker now has a dedicated RedisClusterBroker that connects to a Redis Cluster with automatic node discovery. It is a drop-in replacement for RedisBroker — just change the class name and point it at any cluster node.

None:<br>print(f"Received: {msg}")

@app.after_startup<br>async def publish_event() -> None:<br>await broker.publish("hello from cluster", "events")">from faststream import FastStream<br>from faststream.redis import RedisClusterBroker

# A single URL is enough — the cluster auto-discovers all remaining nodes<br>broker = RedisClusterBroker("redis://node1:7000")<br>app = FastStream(broker)

@broker.subscriber("events")<br>async def handle_event(msg: str) -> None:<br>print(f"Received: {msg}")

@app.after_startup<br>async def publish_event() -> None:<br>await broker.publish("hello from cluster", "events")

⚠️ Breaking Changes

AsyncAPIRoute parameter renames (PR #2894)

The AsyncAPIRoute class (used in ASGI hosting) has had two parameters renamed:

Before<br>After<br>Notes

try_it_out=False<br>try_it_out_path=None<br>Disabling try-it-out now uses None instead of False

try_it_out_url="..."<br>try_it_out_path="..."<br>Parameter renamed for clarity

# Before<br>AsyncAPIRoute("/docs/asyncapi", try_it_out=False)<br>AsyncAPIRoute("/docs/asyncapi", try_it_out_url="https://api.example.com/asyncapi/try")

# After<br>AsyncAPIRoute("/docs/asyncapi", try_it_out_path=None)<br>AsyncAPIRoute("/docs/asyncapi", try_it_out_path="https://api.example.com/asyncapi/try")

Additionally, a new asyncapi_json_path...

faststream broker from none async asyncapi

Related Articles