<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Tvrtko Sternak</title>
    <description>The latest articles on Forem by Tvrtko Sternak (@tvrtko).</description>
    <link>https://forem.com/tvrtko</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1176975%2Ff57c1252-f0a2-4d9d-b7ea-0eb3a08ad87c.jpeg</url>
      <title>Forem: Tvrtko Sternak</title>
      <link>https://forem.com/tvrtko</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/tvrtko"/>
    <language>en</language>
    <item>
      <title>FastStream: Python's framework for Efficient Message Queue Handling</title>
      <dc:creator>Tvrtko Sternak</dc:creator>
      <pubDate>Mon, 16 Oct 2023 14:28:26 +0000</pubDate>
      <link>https://forem.com/airtai/faststream-pythons-framework-for-efficient-message-queue-handling-3pd2</link>
      <guid>https://forem.com/airtai/faststream-pythons-framework-for-efficient-message-queue-handling-3pd2</guid>
      <description>&lt;p&gt;Ever felt lost in the complexity of microservices and message queues like Kafka and RabbitMQ? FastStream is here to simplify it all. That's precisely why we created FastStream. Initially, it was our solution to the challenges we faced with messaging queues in our own projects. But as it simplified our lives, we realized it could do the same for others. So, we decided to share it with the world.&lt;/p&gt;

&lt;p&gt;FastStream streamlines the entire process of working with message queues in microservices. Parsing messages, managing networking, and keeping documentation updated—all handled effortlessly.&lt;/p&gt;

&lt;p&gt;In this blog post, we'll explore how FastStream simplifies microservices development. Let's dive in and discover how FastStream can revolutionize your workflow.&lt;/p&gt;

&lt;p&gt;Hint: If you want to dive in the code right away, checkout the hands-on tutorial at &lt;a href="https://faststream.airt.ai/0.2/#install" rel="noopener noreferrer"&gt;FastStream documentation&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Our motivation
&lt;/h2&gt;

&lt;p&gt;Our journey with FastStream started when we needed to integrate our machine learning models into a customer's &lt;a href="https://kafka.apache.org/" rel="noopener noreferrer"&gt;Apache Kafka&lt;/a&gt; environment. To streamline this process, we created &lt;a href="https://github.com/airtai/fastkafka" rel="noopener noreferrer"&gt;FastKafka&lt;/a&gt; using &lt;a href="https://github.com/aio-libs/aiokafka" rel="noopener noreferrer"&gt;AIOKafka&lt;/a&gt;, &lt;a href="https://www.asyncapi.com/" rel="noopener noreferrer"&gt;AsyncAPI&lt;/a&gt;, and &lt;a href="https://docs.python.org/3/library/asyncio.html" rel="noopener noreferrer"&gt;asyncio&lt;/a&gt;. It was our first step in making message queue management easier.&lt;/p&gt;

&lt;p&gt;Later, we discovered &lt;a href="https://github.com/Lancetnik/Propan" rel="noopener noreferrer"&gt;Propan&lt;/a&gt;, a library created by &lt;a href="https://github.com/Lancetnik" rel="noopener noreferrer"&gt;Nikita Pastukhov&lt;/a&gt;, which solved similar problems but for &lt;a href="https://www.rabbitmq.com/" rel="noopener noreferrer"&gt;RabbitMQ&lt;/a&gt;. Recognizing the potential for collaboration, we joined forces with Nikita to build a unified library that could work seamlessly with both Kafka and RabbitMQ. And that's how FastStream came to be—a solution born out of the need for simplicity and efficiency in microservices development.&lt;/p&gt;

&lt;h2&gt;
  
  
  Key features that set FastStream apart 🚀
&lt;/h2&gt;

&lt;p&gt;FastStream is more than just another library; it's a powerful toolkit designed to simplify and supercharge your microservices development. Let's dive into the key features that make FastStream stand out:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Multiple Broker Support:&lt;/strong&gt; FastStream provides a unified API that works seamlessly across multiple message brokers. Whether you're dealing with Kafka, RabbitMQ, or others, FastStream has you covered, making it effortless to switch between them.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="n"&gt;broker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nd"&gt;@broker.publisher&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;prediction&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_input_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;InputData&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Prediction&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
  &lt;span class="c1"&gt;# your processing processing
&lt;/span&gt;  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;prediction&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="c1"&gt;# Just change the broker class, 
#  rest of the code stays the same
&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;RabbitBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:5672&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nd"&gt;@broker.publisher&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;prediction&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_input_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;InputData&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Prediction&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
  &lt;span class="c1"&gt;# your processing processing
&lt;/span&gt;  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;prediction&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;Pydantic Validation:&lt;/strong&gt; Leverage the robust validation capabilities of Pydantic to serialize and validate incoming messages. With Pydantic, you can ensure that your data is always in the right format.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="n"&gt;broker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FastStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_input_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;InputData&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt; &lt;span class="c1"&gt;# &amp;lt;- decodes consumed message using InputData(**json.loads(data))
&lt;/span&gt;  &lt;span class="c1"&gt;# your processing logic
&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="n"&gt;broker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FastStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nd"&gt;@broker.publisher&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;prediction&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_input_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;InputData&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Prediction&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="c1"&gt;# &amp;lt;- encodes produced message using Prediction.json()
&lt;/span&gt;  &lt;span class="c1"&gt;# some processing
&lt;/span&gt;  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;prediction&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;Automatic Documentation:&lt;/strong&gt; FastStream keeps you ahead of the game with automatic AsyncAPI documentation generation. Say goodbye to outdated documentation – FastStream ensures it's always up-to-date.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8a7dii7makw1s48ltqx8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8a7dii7makw1s48ltqx8.png" alt="Basic FastStream documentation example"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Intuitive Development:&lt;/strong&gt; FastStream offers full-typed editor support, catching errors before they reach runtime. This means you can code with confidence, knowing that issues are caught early in the development process.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Powerful Dependency Injection System:&lt;/strong&gt; Manage your service dependencies efficiently with FastStream's built-in Dependency Injection (DI) system. Say goodbye to spaghetti code and embrace clean, modular architecture.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="n"&gt;broker&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FastStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="err"&gt;“&lt;/span&gt;&lt;span class="n"&gt;input_data&lt;/span&gt;&lt;span class="err"&gt;”&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# Load a global logger instance from faststream.Context()
&lt;/span&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_input_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;InputData&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;()):&lt;/span&gt;
  &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;Testability:&lt;/strong&gt; FastStream supports in-memory tests, making your Continuous Integration and Continuous Deployment (CI/CD) pipeline faster and more reliable. Test your microservices with ease, ensuring they perform as expected.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;test_base_app&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
  &lt;span class="c1"&gt;# Subscribe to prediction topic so that we can assert incoming msgs
&lt;/span&gt;  &lt;span class="nd"&gt;@broker.subscriber&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;prediction&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;on_prediction&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Prediction&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;pass&lt;/span&gt;

  &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;TestKafkaBroker&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# Publish a test message to the input_data topic
&lt;/span&gt;    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;broker&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;InputData&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;input_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Check that the handle function for "input_data" topic was called with the correct msg
&lt;/span&gt;    &lt;span class="n"&gt;on_input_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_called_once_with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;InputData&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.2&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;

    &lt;span class="c1"&gt;# Check that the service responded with the correct prediction in the "prediction" topic
&lt;/span&gt;    &lt;span class="n"&gt;on_prediction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mock&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;assert_called_once_with&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Prediction&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;score&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;1.2&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;Seamless Integrations:&lt;/strong&gt; FastStream plays well with others. It's fully compatible with any HTTP framework you prefer, with a special emphasis on compatibility with FastAPI.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="c1"&gt;# Create a FastStream router
&lt;/span&gt;&lt;span class="n"&gt;router&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaRouter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="bp"&gt;...&lt;/span&gt;

&lt;span class="c1"&gt;# Connect a FastStream router to a FastAPI application lifespan
&lt;/span&gt;&lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FastAPI&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;lifespan&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;router&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;lifespan_context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;Built for Automatic Code Generation:&lt;/strong&gt; FastStream is optimized for automatic code generation using advanced models like GPT. This means you can leverage the power of code generation to boost your productivity. Checkout the amazing tool we built for the microservice code generation: &lt;a href="https://github.com/airtai/faststream-gen/" rel="noopener noreferrer"&gt;faststream-gen&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fll4yodzbn4s387jpn6rb.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fll4yodzbn4s387jpn6rb.png" alt="FsLovesGPT"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpnk44x6v0u7zmm4mzxdz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpnk44x6v0u7zmm4mzxdz.png" alt="Image description"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;FastStream, in a nutshell, offers ease, efficiency, and power in your microservices development journey. Whether you're just starting or looking to scale your microservices, FastStream is your trusted companion. With these core features at your disposal, you'll be well-equipped to tackle the challenges of modern, data-centric microservices.&lt;/p&gt;

&lt;h2&gt;
  
  
  Let's build something!
&lt;/h2&gt;

&lt;p&gt;Now, let's get our hands a bit dirty 👷.&lt;br&gt;
Let's implement an example python app using &lt;strong&gt;FastStream&lt;/strong&gt; that consumes names from "persons" topic and outputs greetings to the "greetings" topic.&lt;/p&gt;

&lt;h3&gt;
  
  
  Cookiecutter project
&lt;/h3&gt;

&lt;p&gt;To start our project, we will use the prepared cookiecutter FastStream project. To find out more about it, check our &lt;a href="https://faststream.airt.ai/latest/getting-started/template/" rel="noopener noreferrer"&gt;detailed guide&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Install the &lt;a href="https://github.com/cookiecutter/cookiecutter" rel="noopener noreferrer"&gt;cookiecutter&lt;/a&gt; package using the following command:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

pip &lt;span class="nb"&gt;install &lt;/span&gt;cookiecutter


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Now, run the provided cookiecutter command and fill out the relevant details to generate a new FastStream project, we will name this project "greetings_app":&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;

cookiecutter https://github.com/airtai/cookiecutter-faststream.git


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The creation process should look like this:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

You&lt;span class="sb"&gt;`&lt;/span&gt;ve downloaded /Users/tvrtko/.cookiecutters/cookiecutter-faststream before. Is it okay to delete and re-download it? &lt;span class="o"&gt;[&lt;/span&gt;y/n] &lt;span class="o"&gt;(&lt;/span&gt;y&lt;span class="o"&gt;)&lt;/span&gt;: y
  &lt;span class="o"&gt;[&lt;/span&gt;1/4] username &lt;span class="o"&gt;(&lt;/span&gt;github-username&lt;span class="o"&gt;)&lt;/span&gt;: sternakt
  &lt;span class="o"&gt;[&lt;/span&gt;2/4] project_name &lt;span class="o"&gt;(&lt;/span&gt;My FastStream App&lt;span class="o"&gt;)&lt;/span&gt;: Greetings App
  &lt;span class="o"&gt;[&lt;/span&gt;3/4] project_slug &lt;span class="o"&gt;(&lt;/span&gt;greetings_app&lt;span class="o"&gt;)&lt;/span&gt;: greetings_app
  &lt;span class="o"&gt;[&lt;/span&gt;4/4] Select streaming_service
    1 - kafka
    2 - nats
    3 - rabbit
    Choose from &lt;span class="o"&gt;[&lt;/span&gt;1/2/3] &lt;span class="o"&gt;(&lt;/span&gt;1&lt;span class="o"&gt;)&lt;/span&gt;: 1
&lt;span class="sb"&gt;```&lt;/span&gt;

Change the working directory to the newly created directory and &lt;span class="nb"&gt;install &lt;/span&gt;all development requirements using pip:
&lt;span class="sb"&gt;```&lt;/span&gt;sh
&lt;span class="nb"&gt;cd &lt;/span&gt;greetings_app
pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="s2"&gt;".[dev]"&lt;/span&gt;
&lt;span class="sb"&gt;```&lt;/span&gt;

Now we are ready to edit our greetings_app/application.py and tests/test_application.py files to implement our application logic. 


&lt;span class="c"&gt;### Writing app code&lt;/span&gt;

&lt;span class="k"&gt;**&lt;/span&gt;FastStream&lt;span class="k"&gt;**&lt;/span&gt; brokers provide convenient &lt;span class="k"&gt;function &lt;/span&gt;decorators &lt;span class="sb"&gt;`&lt;/span&gt;@broker.subscriber&lt;span class="sb"&gt;`&lt;/span&gt; and &lt;span class="sb"&gt;`&lt;/span&gt;@broker.publisher&lt;span class="sb"&gt;`&lt;/span&gt; to allow you to delegate the actual process of:

- consuming and producing data to Event queues, and

- decoding and encoding JSON encoded messages

These decorators make it easy to specify the processing logic &lt;span class="k"&gt;for &lt;/span&gt;your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.

Also, &lt;span class="k"&gt;**&lt;/span&gt;FastStream&lt;span class="k"&gt;**&lt;/span&gt; uses &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="k"&gt;**&lt;/span&gt;Pydantic&lt;span class="k"&gt;**&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;https://docs.pydantic.dev/&lt;span class="o"&gt;)&lt;/span&gt; to parse input JSON-encoded data into Python objects, making it easy to work with structured data &lt;span class="k"&gt;in &lt;/span&gt;your applications, so you can serialize your input messages just using &lt;span class="nb"&gt;type &lt;/span&gt;annotations.

Here is an example python app we talked about:

&lt;span class="sb"&gt;```&lt;/span&gt;python
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
from pydantic import BaseModel, Field

version &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"0.1.0"&lt;/span&gt;
title &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"My FastStream service"&lt;/span&gt;
description &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s2"&gt;"Description of my FastStream service"&lt;/span&gt;


class Name&lt;span class="o"&gt;(&lt;/span&gt;BaseModel&lt;span class="o"&gt;)&lt;/span&gt;:
    name: str &lt;span class="o"&gt;=&lt;/span&gt; Field&lt;span class="o"&gt;(&lt;/span&gt;..., &lt;span class="nv"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"Name of the person"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;


class Greeting&lt;span class="o"&gt;(&lt;/span&gt;BaseModel&lt;span class="o"&gt;)&lt;/span&gt;:
    greeting: str &lt;span class="o"&gt;=&lt;/span&gt; Field&lt;span class="o"&gt;(&lt;/span&gt;..., &lt;span class="nv"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"Greeting message"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;


broker &lt;span class="o"&gt;=&lt;/span&gt; KafkaBroker&lt;span class="o"&gt;()&lt;/span&gt;
app &lt;span class="o"&gt;=&lt;/span&gt; FastStream&lt;span class="o"&gt;(&lt;/span&gt;broker, &lt;span class="nv"&gt;title&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;title, &lt;span class="nv"&gt;version&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;version, &lt;span class="nv"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;description&lt;span class="o"&gt;)&lt;/span&gt;

to_greetings &lt;span class="o"&gt;=&lt;/span&gt; broker.publisher&lt;span class="o"&gt;(&lt;/span&gt;
    &lt;span class="s2"&gt;"greetings"&lt;/span&gt;,
    &lt;span class="nv"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"Produces a message on greetings after receiving a meesage on names"&lt;/span&gt;,
&lt;span class="o"&gt;)&lt;/span&gt;


@broker.subscriber&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"names"&lt;/span&gt;, &lt;span class="nv"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"Consumes messages from names topic and produces messages to greetings topic"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
async def on_names&lt;span class="o"&gt;(&lt;/span&gt;msg: Name, logger: Logger&lt;span class="o"&gt;)&lt;/span&gt; -&amp;gt; None:
    result &lt;span class="o"&gt;=&lt;/span&gt; f&lt;span class="s2"&gt;"hello {msg.name}"&lt;/span&gt;
    logger.info&lt;span class="o"&gt;(&lt;/span&gt;result&lt;span class="o"&gt;)&lt;/span&gt;
    greeting &lt;span class="o"&gt;=&lt;/span&gt; Greeting&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;greeting&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;result&lt;span class="o"&gt;)&lt;/span&gt;
    await to_greetings.publish&lt;span class="o"&gt;(&lt;/span&gt;greeting&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="sb"&gt;```&lt;/span&gt;

The example application will subscribe to &lt;span class="k"&gt;**&lt;/span&gt;persons&lt;span class="k"&gt;**&lt;/span&gt; Kafka topic and consume Name JSON messages from it. When the application consumes a message it will publish a Greetings JSON message &lt;span class="k"&gt;**&lt;/span&gt;greetings&lt;span class="k"&gt;**&lt;/span&gt; topic.

We can save the application into the &lt;span class="sb"&gt;`&lt;/span&gt;application.py&lt;span class="sb"&gt;`&lt;/span&gt; file and &lt;span class="nb"&gt;let&lt;/span&gt;&lt;span class="s1"&gt;'s take a closer look at the code.

**Creating a broker**
To create an application, first we need to create a broker. This is the main piece of FastStream and takes care of the defining subscribers and producers.

```python
version = "0.1.0"
title = "My FastStream service"
description = "Description of my FastStream service"

...

broker = KafkaBroker()
app = FastStream(broker, title=title, version=version, description=description)
```

**Defining data structures**
Next, we need to define the structure of incoming and outgoing data. FastStream is integrated with Pydantic and offers automatic encoding and decoding of JSON formatted messages into Pydantic classes.

```python
class Name(BaseModel):
    name: str = Field(..., description="Name of the person")


class Greeting(BaseModel):
    greeting: str = Field(..., description="Greeting message")
```

**Defining a publisher**
Now, we define the publishing logic of our application.

```python
to_greetings = broker.publisher(
    "greetings",
    description="Produces a message on greetings after receiving a message on names",
)
```

**Defining a subscriber**
Finally, we can define the subscribing logic of our application. The app will consume data from the "names" topic and use the defined publisher to produce to the "greetings" topic whenever a message is consumed.

```python
@broker.subscriber("names", description="Consumes messages from names topic and produces messages to greetings topic")
async def on_names(msg: Name, logger: Logger) -&amp;gt; None:
    result = f"hello {msg.name}"
    logger.info(result)
    greeting = Greeting(greeting=result)
    await to_greetings.publish(greeting)
```

### Testing the service

The service can be tested using the `TestBroker` context managers, which, by default, puts the Broker into "testing mode".

The Tester will redirect your `subscriber` and `publisher` decorated functions to the InMemory brokers, allowing you to quickly test your app without the need for a running broker and all its dependencies.

Using pytest, the test for our service would look like this:

```python
import pytest
from faststream.kafka import TestKafkaBroker

from greetings_app.application import Greeting, Name, broker, on_names


# Subscribe to the "greetings" topic so we can monitor 
# messages our application is producing
@broker.subscriber("greetings")
async def on_greetings(msg: Greeting) -&amp;gt; None:
    pass


@pytest.mark.asyncio
async def test_on_names():
    async with TestKafkaBroker(broker):
        # Send John to "names" topic
        await broker.publish(Name(name="John"), "names")

        # Assert that our application has consumed "John"
        on_names.mock.assert_called_with(dict(Name(name="John")))

        # Assert that our application has greeted John in the "greetings" topic
        on_greetings.mock.assert_called_with(dict(Greeting(greeting="hello John")))
```

In the test, we send a test User JSON to the **in** topic, and then we assert that the broker has responded to the **out** topic with the appropriate message.

We can save the test to the test_application.py file and run the test by executing the following command in your application root file.

```shell
pytest
```

Here is how the tests execution should look like in your terminal:


```shell
===================================== test session starts =====================================
platform darwin -- Python 3.11.5, pytest-7.4.2, pluggy-1.3.0
rootdir: /Users/tvrtko/Documents/Airt Projects/FastStream/faststream-cookiecutter/greetings_app
configfile: pyproject.toml
plugins: asyncio-0.21.1, anyio-3.7.1
asyncio: mode=Mode.STRICT
collected 1 item                                                                              

tests/test_application.py .                                                             [100%]

====================================== 1 passed in 0.34s ======================================
```

### Running the application

The application can be started using built-in **FastStream** CLI command.

To run the service, use the **FastStream CLI** command and pass the module (in this case, the file where the app implementation is located) and the app symbol to the command.

``` shell
faststream run greetings_app.application:app
```

After running the command, you should see the following output:

``` shell
2023-10-13 08:36:32,162 INFO     - FastStream app starting...
2023-10-13 08:36:32,170 INFO     - names |            - `OnNames` waiting for messages
2023-10-13 08:36:32,177 INFO     - FastStream app started successfully! To exit, press CTRL+C
```

Also, **FastStream** provides you a great hot reload feature to improve your Development Experience

``` shell
faststream run greetings_app.application:app --reload
```

And multiprocessing horizontal scaling feature as well:

``` shell
faststream run greetings_app.application:app --workers 3
```

### Documentation

FastStream provides a command to serve the AsyncAPI documentation, let'&lt;/span&gt;s use it to document our application.
To generate and serve the documentation, run the following &lt;span class="nb"&gt;command&lt;/span&gt;:

&lt;span class="sb"&gt;```&lt;/span&gt;shell
faststream docs serve greetings_app.application:app
&lt;span class="sb"&gt;```&lt;/span&gt;

Now, you should see the following output:

&lt;span class="sb"&gt;```&lt;/span&gt;shell
INFO:     Started server process &lt;span class="o"&gt;[&lt;/span&gt;47151]
INFO:     Waiting &lt;span class="k"&gt;for &lt;/span&gt;application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:8000 &lt;span class="o"&gt;(&lt;/span&gt;Press CTRL+C to quit&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="sb"&gt;```&lt;/span&gt;

Now open your browser at &lt;span class="sb"&gt;`&lt;/span&gt;http://localhost:8000&lt;span class="sb"&gt;`&lt;/span&gt; and enjoy &lt;span class="k"&gt;in &lt;/span&gt;your automatically generated documentation! :tada:

&lt;span class="o"&gt;![&lt;/span&gt;Generated docs]&lt;span class="o"&gt;(&lt;/span&gt;https://dev-to-uploads.s3.amazonaws.com/uploads/articles/aa3d6fjauuqmbdvck6sx.png&lt;span class="o"&gt;)&lt;/span&gt;

Aaaand, that&lt;span class="s1"&gt;'s it! :tada: :tada: Feel free to experiment further with your application and checkout [the documentation](https://faststream.airt.ai/latest/) for more complex examples.

## Support us on GitHub and join our community :star:

Ready to join the FastStream revolution? Head over to our [GitHub repository](https://github.com/airtai/faststream) and show your support by starring it. By doing so, you'&lt;/span&gt;ll stay &lt;span class="k"&gt;in &lt;/span&gt;the loop with the latest developments, updates, and enhancements as we &lt;span class="k"&gt;continue &lt;/span&gt;to refine and &lt;span class="nb"&gt;expand &lt;/span&gt;FastStream.

Don&lt;span class="s1"&gt;'t forget, we also have an active Discord channel where you can connect with fellow FastStream enthusiasts, ask questions, and share your experiences. Your active participation in our growing community, whether on GitHub or Discord, is invaluable, and we'&lt;/span&gt;re grateful &lt;span class="k"&gt;for &lt;/span&gt;your interest and potential contributions. Together, we can make microservices development simpler and more efficient with FastStream.

&lt;span class="c"&gt;## Conclusion&lt;/span&gt;

FastStream is your go-to tool &lt;span class="k"&gt;for &lt;/span&gt;efficient microservices development. It simplifies message queues, supports various brokers, and offers Pydantic validation and auto-doc generation.

We&lt;span class="s1"&gt;'re immensely grateful for your interest, and we look forward to your potential contributions. With FastStream in your toolkit, you'&lt;/span&gt;re prepared to conquer the challenges of data-centric microservices like never before. Happy coding!
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

</description>
      <category>kafka</category>
      <category>python</category>
      <category>opensource</category>
    </item>
  </channel>
</rss>
