<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Mohammed</title>
    <description>The latest articles on Forem by Mohammed (@mohdizzy).</description>
    <link>https://forem.com/mohdizzy</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F350035%2F18d11ea5-11a2-4de3-808d-e6d92f12dad7.jpg</url>
      <title>Forem: Mohammed</title>
      <link>https://forem.com/mohdizzy</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/mohdizzy"/>
    <language>en</language>
    <item>
      <title>Setting up data pipelines from S3 to Kafka with Flink as an ETL layer</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Sun, 25 Jan 2026 07:03:25 +0000</pubDate>
      <link>https://forem.com/aws-builders/setting-up-data-pipelines-from-s3-to-kafka-with-flink-as-an-etl-layer-2cdk</link>
      <guid>https://forem.com/aws-builders/setting-up-data-pipelines-from-s3-to-kafka-with-flink-as-an-etl-layer-2cdk</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;When you are working with Kafka, there are plenty of ways out there to move data from individual topics to other storage mediums. One of the simplest methods is to use Kafka Connect, as it allows setting up these data pipelines with minimal configuration and a no-code setup (almost). Whether replicating topics between clusters or pushing data to S3, all of that is achievable with plugins from Confluent like MirrorMaker2 and S3 Sink Connector, among other,s for corresponding systems.&lt;/p&gt;

&lt;p&gt;However, I recently ran into a situation where I needed to move volumes of streaming data from S3 into Kafka using MSK Connect (a managed solution from AWS for Kafka Connect). I needed the ability to selectively pull data from S3 and not the whole data set. For this, there are plugins available, but it came with a catch of not being free to use. I did manage to find some options with other providers, but not as viable as the ones Confluent publishes. I also needed to perform some data transformations as the streams of data in S3 were not in the format the consumers expected.&lt;/p&gt;

&lt;p&gt;When you hit a wall (paywall in this case), you get creative. &lt;em&gt;Enter Flink.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Flink for S3 to Kafka ETL
&lt;/h2&gt;

&lt;p&gt;Apache Flink is a distributed processing engine capable of handling both bounded and unbounded streams. If you haven’t worked with Flink before or need a refresher on the fundamentals, I’d recommend checking out this article, where I walk through the core concepts.&lt;/p&gt;

&lt;p&gt;Using Flink as an ETL layer between S3 and Kafka offers several advantages over MSK Connect with commercial connectors:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Selective data ingestion: Choose exactly which files to process from a specified date range and specific folders.&lt;/li&gt;
&lt;li&gt;Data transformation: Handle multiple data formats and standardize them before pushing to Kafka&lt;/li&gt;
&lt;li&gt;Unified processing: Leverage Flink’s rich API for complex transformations if needed&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  The Flow
&lt;/h2&gt;

&lt;p&gt;The flow is straightforward. Files land in an S3 bucket, Flink reads from S3 using the FileSource API, applies transformations and filtering logic, and pushes the processed events to the intended Kafka topics.&lt;/p&gt;

&lt;p&gt;Here’s what the setup looks like:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Source: S3 bucket containing your data files (JSON, CSV, Parquet, etc.)&lt;/li&gt;
&lt;li&gt;Processing layer: Flink job running on AWS Managed Service for Apache Flink (MSF)&lt;/li&gt;
&lt;li&gt;Sink: Kafka topic on AWS MSK&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Implementation
&lt;/h3&gt;

&lt;p&gt;Let’s assume there is data related to an e-commerce store that needs to be pushed into Kafka topics for a replay. There are two specific data feeds: order &amp;amp; payment. The order stream is gzip-compressed, and the payment data stream needs to be transformed to only pull a subset of the attributes from the JSON.&lt;/p&gt;

&lt;p&gt;The data in S3 was already partitioned by the year, month, day &amp;amp; hour. Using the FileSource connector, the data can be selectively pulled from S3. What filtering you could achieve with commercial S3 source connectors, the FileSource Connector on Flink gives you all the control you need and some more.&lt;/p&gt;

&lt;p&gt;The resulting DataStream is then subjected to the custom transformer logic via a map operator. The ability to apply specific transformation logic per feed is what this setup excels in. The order feed just needs to be gzip-decompressed, whereas the payment feed requires selective pulling of some data elements.&lt;/p&gt;

&lt;p&gt;The transformed DataStream is then Sinked-to the designated Kafka topic.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// Create FileSource for reading from S3
FileSource&amp;lt;String&amp;gt; fileSource = FileSource
        .forRecordStreamFormat(new TextLineInputFormat(), new Path(s3BasePath))
        .setFileEnumerator(
                () -&amp;gt; new org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator() {
                    private boolean isFiltered(org.apache.flink.core.fs.Path path) {
                        String pathStr = path.getPath();
                        boolean matches = dateFilterPattern.matcher(pathStr).find();
                        return !matches;
                    }
                })
        .build();

// Create data stream from S3
DataStream&amp;lt;String&amp;gt; s3Stream = env.fromSource(
        fileSource,
        WatermarkStrategy.noWatermarks(),
        "S3-Historical-Data-Source-" + feedName.toUpperCase());

// Transform the payloads before pushing to Kafka
DataStream&amp;lt;String&amp;gt; transformedStream;
        if (transformerEnabled) {
            DataTransformer transformer = createTransformer();
            System.out.println("Using transformer: " + transformer.getName());

            // Apply transformation
            transformedStream = s3Stream
                    .map(new TransformMapFunction(transformer))
                    .name("Transform-" + feedName.toUpperCase())
                    .filter(new FilterFunction&amp;lt;String&amp;gt;() {
                        @Override
                        public boolean filter(String value) throws Exception {
                            return value != null &amp;amp;&amp;amp; !value.trim().isEmpty();
                        }
                    })
                    .name("Filter-Null-Records");
        }

// Create KafkaSink
KafkaSink&amp;lt;String&amp;gt; kafkaSink = KafkaSink.&amp;lt;String&amp;gt;builder()
        .setBootstrapServers(bootstrapServers)
        .setKafkaProducerConfig(kafkaSinkProperties)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(topicName)
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

// Write to Kafka
transformedStream.sinkTo(kafkaSink).name("Kafka-Sink-" + topicName);

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Local testing
&lt;/h3&gt;

&lt;p&gt;You can get the two jobs up and running in your local environment to simulate the whole behaviour using Docker.&lt;br&gt;
Become a member&lt;/p&gt;

&lt;p&gt;Kafka UI gives you a nice view of seeing all the incoming message streams from the intended topics.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frf6uah72n8sizp5hh3yh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Frf6uah72n8sizp5hh3yh.png" alt="Order Replay topic" width="800" height="442"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgjaq0hq6exmnf5smn5ac.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgjaq0hq6exmnf5smn5ac.png" alt="Payment Replay topic" width="800" height="438"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;With LocalStack, the job can pull data from an actual S3 bucket so that you can test the whole flow and evaluate how things would work in the cloud environment.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyfkq3sgv65my0ygrryh8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyfkq3sgv65my0ygrryh8.png" alt="S3 bucket in LocalStack" width="800" height="441"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here is a &lt;a href="https://github.com/mohdizzy/flink-etl" rel="noopener noreferrer"&gt;link&lt;/a&gt; to the Github repo.&lt;/p&gt;

&lt;h3&gt;
  
  
  What it also solves
&lt;/h3&gt;

&lt;p&gt;Multiple data formats is the other pain point I had to deal with because the streams of data ingested into S3 were from multiple sources, with none of them having a consistent structure. Some feeds might be in JSON, others in CSV or compressed with GZIP. The limitation with Kafka Connect is that you cannot apply any kind of transformation to the data.&lt;/p&gt;

&lt;p&gt;I was able to transform each of the data streams with specific transformer logic. Going one step further, you can even leverage Flink’s state management if you need to maintain context across multiple files or perform stateful transformations. This level of flexibility is hard to achieve with connector-based solutions.&lt;/p&gt;

&lt;h3&gt;
  
  
  Cost considerations
&lt;/h3&gt;

&lt;p&gt;While MSK Connect itself is relatively affordable, when running at scale, the costs can go up quite fast. Coupled with that, if you have licensing costs for the connectors, then you are paying a lot more than what you had imagined.&lt;/p&gt;

&lt;p&gt;With Flink, you pay for the compute resources (KPUs) that MSF provisions. Depending on your workload, this can result in significant cost savings as you fine-tune your processing logic.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;When to use Flink vs MSK Connect&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Flink as an ETL layer makes sense when:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;You need advanced filtering or transformation capabilities.
-You’re dealing with multiple data formats that require normalization.&lt;/li&gt;
&lt;li&gt;Commercial licensing costs are prohibitive.&lt;/li&gt;
&lt;li&gt;You want the flexibility to extend processing logic over time.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;MSK Connect might still be the better choice for:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Simple, straightforward S3-to-Kafka ingestion with no transformations.&lt;/li&gt;
&lt;li&gt;Teams without Flink expertise who prefer managed connector solutions.&lt;/li&gt;
&lt;li&gt;Workloads where the free open-source connectors meet all requirements.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Wrapping up
&lt;/h2&gt;

&lt;p&gt;Before making a decision, mapping out your requirements should be step one. I’d personally prefer going with a managed solution that requires no code management, even if it means paying a little extra. While giving the additional layer of controls, going with the Flink approach does introduce an additional effort for code &amp;amp; infrastructure maintenance. You get regex-based filtering, multi-format transformation support, and the ability to scale processing logic as your requirements evolve.&lt;/p&gt;

&lt;p&gt;If you’re already using Flink for other stream processing tasks, adding S3-to-Kafka ETL into your existing infrastructure is a natural fit. And if you’re new to Flink, this could be a great entry point to explore its capabilities.&lt;/p&gt;

&lt;p&gt;If you run into any questions or want to share your experience, feel free to reach out!&lt;/p&gt;

</description>
      <category>aws</category>
      <category>apacheflink</category>
      <category>kafka</category>
      <category>s3</category>
    </item>
    <item>
      <title>Rust vs Node — Kafka producer performance with AWS Lambda</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Fri, 31 Jan 2025 13:26:35 +0000</pubDate>
      <link>https://forem.com/aws-builders/rust-vs-node-kafka-producer-performance-with-aws-lambda-26mk</link>
      <guid>https://forem.com/aws-builders/rust-vs-node-kafka-producer-performance-with-aws-lambda-26mk</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;If you have heard of Rust, you are probably aware of how performant the language is when compared to the other programming languages of today. To explore how much of a difference we are talking I ran a very simple load test on two AWS Lambda functions; one on Node and the other on Rust. Both run the exact same logic by pushing a sample payload to Kafka. The results were interesting.&lt;/p&gt;

&lt;h2&gt;
  
  
  The setup
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwsbs7ebnwkprzxfdzv4b.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwsbs7ebnwkprzxfdzv4b.png" alt="Rust vs Node Kafka producer" width="586" height="268"&gt;&lt;/a&gt;&lt;br&gt;
The code as I mentioned, takes an incoming event payload and pushes it to a Kafka topic. I’ve set up an MSK cluster for this. Various factors contribute to application performance, and one can be payload size. I ensured to use a payload size of 256KB as I imagine that most use cases would do just fine with a size as large as this one.&lt;/p&gt;

&lt;p&gt;The load test was done using &lt;a href="https://www.artillery.io/" rel="noopener noreferrer"&gt;Artillery&lt;/a&gt;. It’s perfect for running tests quickly without requiring too much of an effort on configuration. Using an API Gateway configured on both Lambda functions, Artillery will hit the endpoint(s) along with the payload and defined throughput.&lt;/p&gt;

&lt;p&gt;To monitor the performance metrics, I decided to use &lt;a href="https://opentelemetry.io/" rel="noopener noreferrer"&gt;OpenTelemetry&lt;/a&gt; with &lt;strong&gt;Datadog&lt;/strong&gt;. Otel if you aren’t aware, is an observability tool/framework for instrumenting your application to gather logs, traces &amp;amp; metrics. It enables you to publish all collected data to a monitoring platform of your choice by following a consistent pattern making it vendor-agnostic as long as the platform supports Otel. Datadog has good support for Otel and offers a host of features under its product umbrella when it comes to complete visibility for your applications.&lt;/p&gt;

&lt;p&gt;Instrumenting the Node function was quite straightforward as I only had to include the Otel Layers and employ the use of the CDK Datadog construct. The Rust function (also having the CDK construct), on the other hand, required a bit of a manual approach since auto-instrumentation is not available. I was able to cobble together some pieces by referring this &lt;a href="https://github.com/DataDog/serverless-sample-app" rel="noopener noreferrer"&gt;repository&lt;/a&gt; from James Eastham.&lt;/p&gt;

&lt;p&gt;The Rust Kafka client library I used internally can be explored from this &lt;a href="https://github.com/fede1024/rust-rdkafka" rel="noopener noreferrer"&gt;repository&lt;/a&gt;. The Lambda code and CDK IaC can be found &lt;a href="https://github.com/mohdizzy/rust-node_kafka_perf" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Comparing test results
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F03f7cj68t47vmm5msb7b.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F03f7cj68t47vmm5msb7b.png" alt="Rust Kafka Producer" width="800" height="341"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Looking at the median p50 metric, the latency difference doesn’t seem significant. The p99 does paint a different picture though. Also, memory utilization showed quite the difference; with Rust taking around 70–90MB, and Node around 150–176MB (some times even more).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F99okltp8aluj3njyn1zy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F99okltp8aluj3njyn1zy.png" alt="Node Kafka Producer" width="800" height="342"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, I do want to highlight a few things before we crown Rust the supreme winner. Though both Lambdas are essentially doing the same thing, the underlying Kafka client used isn’t something I created. Depending on how those libraries are built, the overall performance can possibly vary. The Node Lambda requires two additional layers for instrumenting with Otel as I opted to auto-instrument and not manually set up the tracing code, unlike the Rust function. I do think that the auto-instrumenting could also affect the performance of the Node function and how the traces/metric data is collected &amp;amp; exported, maybe not significantly but nevertheless worth taking note of.&lt;/p&gt;

&lt;h2&gt;
  
  
  Switch to Rust?
&lt;/h2&gt;

&lt;p&gt;It’s clear that Rust offers great advantage with cost and performance but is it worth the steep learning curve? I think yes. Although, I have been playing around and exploring Rust on and off but am still no where close to being comfortable with it.&lt;/p&gt;

&lt;p&gt;While its great to use the best tool for the job, sometimes, it requires a little more thought. If you are looking to get going real quick with a new product development cycle, unless everyone on your backend team is proficient with Rust and can comfortably handle all curve balls that might come with the evolution of the product you are building, you might want to take a step back before going all-in. It is quite likely that some of the tools you use within the workflow are even yet to offer support for Rust.&lt;/p&gt;

&lt;h2&gt;
  
  
  Closing thoughts
&lt;/h2&gt;

&lt;p&gt;In the Serverless ecosystem, because Lambdas can be designed to handle isolated pieces of your application, it becomes a lot easier to mix and match. You could pick one of those functions having the highest throughput that also consumes the highest memory along with longer execution times and start with that for &lt;em&gt;Rustification&lt;/em&gt; (yes, it’s fine to call it that). In essence, you want to hand-pick those parts of the application that are performance-critical and then explore the feasibility of porting them to Rust.&lt;/p&gt;

&lt;p&gt;Though Rust’s ecosystem isn’t quite there yet when compared to the other languages, its community support and overall appeal is growing. And that’s a good sign to start getting your hands dirty and begin the journey sooner rather than later.&lt;/p&gt;

&lt;p&gt;If you have tried doing anything interesting with Rust, I’d love to hear it!&lt;/p&gt;

</description>
      <category>aws</category>
      <category>lambda</category>
      <category>rust</category>
      <category>kafka</category>
    </item>
    <item>
      <title>Guarantee message deliveries for real-time WebSocket APIs with Serverless on AWS</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Sun, 24 Nov 2024 12:51:32 +0000</pubDate>
      <link>https://forem.com/aws-builders/guarantee-message-deliveries-for-real-time-websocket-apis-with-serverless-on-aws-3dba</link>
      <guid>https://forem.com/aws-builders/guarantee-message-deliveries-for-real-time-websocket-apis-with-serverless-on-aws-3dba</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Real-time updates are crucial to all modern applications because maintaining a high-quality consumer experience is paramount. Traditionally, setting up a WebSocket infrastructure is tedious and can get difficult when things begin to scale.&lt;/p&gt;

&lt;p&gt;Fortunately, AWS recently rolled out a feature as part of their AppSync product offering where you can create a pure Serverless WebSocket API for real-time communication, and it’s called &lt;a href="https://aws.amazon.com/blogs/mobile/announcing-aws-appsync-events-serverless-websocket-apis/" rel="noopener noreferrer"&gt;AppSync Events&lt;/a&gt;. Its minimalist setup is quite appealing, however, there is one problem in general with WebSocket communication; &lt;strong&gt;message delivery guarantee&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;While the caller is listening for events, there are situations like for a mobile app to lose its data connectivity making it go offline and potentially losing out on messages that might have been sent during that period. When the subscriber is back online, we ideally want to ensure those messages get delivered. In this article, we’ll explore one of the ways of dealing with this problem in Serverless style using AWS.&lt;/p&gt;

&lt;h2&gt;
  
  
  Getting Started with Events API
&lt;/h2&gt;

&lt;p&gt;The setup is quite simple. Provisioning the API itself involves only two parts: choosing the Authorization mode(s) and defining the channel namespaces.&lt;/p&gt;

&lt;p&gt;Namespaces are dedicated channels you create by sub-paths and then allow clients to subscribe to specific channels (eg. /new-updates/sports/football) or a subset by using wildcards (eg. /new-updates/sports/*)&lt;/p&gt;

&lt;p&gt;Authorization can be defined at the API level and then overridden at the channel namespace level for publishers and subscribers.&lt;/p&gt;

&lt;p&gt;Once the API is provisioned, the real-time endpoint is used for subscribing to one of the defined channels. The HTTP endpoint is used by the publisher for pushing events into a channel.&lt;/p&gt;

&lt;h2&gt;
  
  
  Retry Message Delivery Flow
&lt;/h2&gt;

&lt;p&gt;The publisher is only responsible for pushing messages to specified channels. Whether that message reached some of the subscribers or none of the subscribers is not the publisher's concern. For mission-critical applications, not knowing if the published message reached the intended recipient or not can be unsettling. Naturally, losing messages is not ideal, so one possible way to deal with this would be to let your backend systems know when messages are successfully received by your subscribers.&lt;/p&gt;

&lt;p&gt;In the flow indicated above, the first part is to deliver the message. The second portion is where the client calls an HTTP endpoint to acknowledge the receipt of that message. If the acknowledgment is not received within a minute, we publish the same payload to the same channel and continue to do so for the next ten minutes with the hope that the client is back online and receives the message.&lt;/p&gt;

&lt;p&gt;Here is a quick summary of the technical aspects involved in the flow,&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Publish the payload with a UUID included as part of it. UUIDv7 has the advantage of being time-ordered, which is great as it might help in situations where the ordering of messages can be important.&lt;/li&gt;
&lt;li&gt;Create a schedule using the Eventbridge Scheduler service with the name being the UUID itself. The target is another Lambda function with the interval being 1 minute and expiring in the next 10 minutes with auto delete enabled.&lt;/li&gt;
&lt;li&gt;Store the published payload in S3 with the object name being the UUID itself. The reason for having S3 in the picture is for larger payloads. Most Serverless components within AWS have 256KB payload limits which might be restrictive in some situations. The Events API supports payload sizes up to &lt;em&gt;1.2MB&lt;/em&gt;!&lt;/li&gt;
&lt;li&gt;The client after receiving the message, calls an HTTP endpoint with the UUID included in the request body. A Lambda uses that UUID to remove the schedule from Eventbridge thereby preventing the retry flow from moving forward.&lt;/li&gt;
&lt;li&gt;If the acknowledgment hasn’t come through, the Scheduler will invoke the retrier Lambda every minute. The Scheduler name is the UUID which also is the name of the object in S3 with which the Lambda can retrieve the payload and push it to the Channel again.&lt;/li&gt;
&lt;li&gt;The Scheduler is set to end in 10 minutes as we don’t want to keep publishing for an extended period.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here is the Github &lt;a href="https://github.com/mohdizzy/appsync-events-retry" rel="noopener noreferrer"&gt;link&lt;/a&gt; to the complete setup. I’ve deployed the Events API infrastructure using the Serverless framework via vanilla Cloudformation syntax. I assume the support for AWS CDK currently might be limiting given the service was only recently rolled out.&lt;/p&gt;

&lt;h2&gt;
  
  
  Closing thoughts
&lt;/h2&gt;

&lt;p&gt;The above flow works fine when we have one subscriber for a specific channel. If there are multiple subscribers, then ensuring deliveries for all of them might require some adjustments to the flow. The changes would need to be something along the lines of maintaining schedules per subscriber implying that the publisher needs to know in advance all the relevant subscribers.&lt;/p&gt;

&lt;p&gt;If the published payload is less than 256KB, an alternate way to approach the retry flow would be to use DynamoDB and SQS. DynamoDB would hold all published UUIDs along with a flag. Each published message would get pushed into FIFO SQS having a delay period of 1 minute or so. The Lambda consuming the queue would first check the acknowledgment flag against the UUID in DynamoDB, if false, it would publish the payload again. With SQS batch processing enabled and utilizing the partial batch failures feature, we could set up retries to 2 or any other number as needed.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0nkv54dsxw158rnbazvi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0nkv54dsxw158rnbazvi.png" alt="Alternate flow using SQS,DynamoDB" width="800" height="333"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To allow the use of the retrier Lambda across multiple APIs, we should logically place the endpoint, namespace, and other metadata within the Scheduler service payload so that the retrier flow can behave like a common piece across all services utilizing the WebSocket flow.&lt;/p&gt;

</description>
      <category>serverless</category>
      <category>appsync</category>
      <category>websocket</category>
      <category>lambda</category>
    </item>
    <item>
      <title>Dynamic rule processing for data streams using Flink &amp; Serverless on AWS</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Wed, 07 Aug 2024 16:12:13 +0000</pubDate>
      <link>https://forem.com/aws-builders/dynamic-rule-processing-for-data-streams-using-flink-serverless-on-aws-487f</link>
      <guid>https://forem.com/aws-builders/dynamic-rule-processing-for-data-streams-using-flink-serverless-on-aws-487f</guid>
      <description>&lt;blockquote&gt;
&lt;p&gt;The write up below requires some working knowledge of Apache Flink along with basic understanding of AWS. Apache Flink is a Complex Event Processing (CEP) framework primarily used for working with continuous data streams at hyperscale offering tools to work with a wide variety of use cases. &lt;br&gt;
If you’re new to Flink and do not have the vaguest idea, I’d recommend reading &lt;a href="https://dev.to/aws-builders/serverless-complex-event-processing-with-apache-flink-3dbm"&gt;this&lt;/a&gt; piece first.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  The what &amp;amp; why
&lt;/h2&gt;

&lt;p&gt;Pattern detection is one of the areas Flink excels at scale. When working with data streams, there could be use cases wherein the incoming event must be subjected to a set of rules to determine if it needs to be processed further by downstream systems. These rules are usually simple JSON payloads.&lt;/p&gt;

&lt;p&gt;Alternatively, let’s say there is some kind of data enrichment required for specific events within the source stream. The data used for the enrichment is static and may be altered dynamically over time by pushing a new dataset.&lt;/p&gt;

&lt;p&gt;Flink enables achieving both the above use cases with the Broadcast pattern.&lt;/p&gt;

&lt;h2&gt;
  
  
  How Broadcast works
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Broadcasting&lt;/strong&gt; on Flink is the idea of pushing a payload (set of rules/items) and making it available to all data streams that you might have within your Flink application.&lt;/p&gt;

&lt;p&gt;Flink maintains the internal state for a Broadcast stream. Every time an event is pushed into the source topic receiving the events intended for the Broadcast stream, the state is updated (overwritten or appended based on business logic) automatically.&lt;/p&gt;

&lt;p&gt;If new “rules” are to be appended with the existing Broadcast message payload, then there are two ways to achieve this,&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The first is to push a combined message payload to the source topic containing all the relevant attributes within the single payload. This requires no code adjustment within Flink and only requires the source to handle the payload that’s being pushed.&lt;/li&gt;
&lt;li&gt;The second approach is to update the Broadcast state manually at the code level within Flink by appending the state from the incoming payload to the existing state.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Both approaches work well with their own merits &amp;amp; demerits. It should be noted that whenever the state is updated, all operators consuming the Broadcast stream will have access to the same state.&lt;/p&gt;

&lt;h2&gt;
  
  
  Flow setup on AWS
&lt;/h2&gt;

&lt;p&gt;The diagram above depicts a simple flow using AWS resources. Kafka (AWS MSK) acts as the message broker. Kinesis Data Streams can also be used as the source and sink with the appropriate connector configuration within Flink. The Flink job is deployed into the Managed Service for Apache Flink (MSF).&lt;/p&gt;

&lt;p&gt;The rules are ingested via an upload to the S3 bucket. This bucket acts as a source for a Lambda function which transforms the CSV file(s) into a JSON payload before pushing it to a Kafka topic. This is the source topic for Flink to receive the message/rules for Broadcasting. Of course, the delivery of the message to the Kafka topic can be done in multiple ways. For instance, an API Gateway + Lambda could also work well.&lt;/p&gt;

&lt;p&gt;Once the message (rule) gets ingested, it will be applied to every event that comes through Flink from the source event topic, processed based on the business logic, and pushed out to the sink (Kafka topic).&lt;/p&gt;

&lt;h2&gt;
  
  
  Under the hood
&lt;/h2&gt;

&lt;p&gt;At a high level, the data stream setup in the Flink job looks something like this,&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// source stream
  DataStream&amp;lt;String&amp;gt; sourceStream = env.fromSource(eventSource, WatermarkStrategy.noWatermarks(),
    "event-source");

  // ETD rules stream
  DataStream&amp;lt;String&amp;gt; ruleStream = env.fromSource(ruleEventSource, WatermarkStrategy.noWatermarks(),
    "rule-source");

  // Broadcast state descriptor
  MapStateDescriptor&amp;lt;String, String&amp;gt; broadcastStateDescriptor =
    new MapStateDescriptor&amp;lt;&amp;gt;("broadcastState", String.class, String.class);

  // Broadcast stream
  BroadcastStream&amp;lt;String&amp;gt; broadcastStream = ruleStream.broadcast(broadcastStateDescriptor);

  SingleOutputStreamOperator&amp;lt;String&amp;gt; flightDelayCompensationStream = sourceStream.connect(broadcastStream)
    .process(new EventBroadcastProcessFunction(broadcastStateDescriptor)).uid("rule-processor");
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Depending on the source event stream being &lt;em&gt;keyed&lt;/em&gt; or not, the appropriate Broadcast processor class needs to be extended, which is either the &lt;em&gt;KeyedBroadcastProcessFunction&lt;/em&gt; or &lt;em&gt;BroadcastProcessFunction&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;The setup below has a non-keyed stream, therefore the methods belonging to the regular BroadcastProcessFunction class are overridden.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    @Override
    public void processElement(String value, ReadOnlyContext ctx, Collector&amp;lt;String&amp;gt; out) throws Exception
    {
        {
            try
            {
                ReadOnlyBroadcastState&amp;lt;String, String&amp;gt; broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);
                String rules = broadcastState.get("rules");

                //business logic

                out.collect(value);
            }
            catch (Exception e)
            {
                logger.error("Something went wrong"+e.toString())
            }

        }
    }

    @Override
    public void processBroadcastElement(String rule, Context ctx, Collector&amp;lt;String&amp;gt; collector) throws Exception
    {
        final String uuid = UUID.randomUUID().toString();
        ctx.getBroadcastState(broadcastStateDescriptor).put("rules", rule);
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;em&gt;processBroadcastElement&lt;/em&gt; function is where the Broadcast state can manipulated. Each time a payload comes through the Broadcast message source, we could append the values to the existing state or overwrite as needed.&lt;/p&gt;

&lt;p&gt;The Broadcast state can be accessed within the &lt;em&gt;processElement&lt;/em&gt; function and that’s where the actual source event comes through. After reading from the state, the rules can then be applied to the business logic for every incoming event.&lt;/p&gt;

&lt;h2&gt;
  
  
  Running it locally
&lt;/h2&gt;

&lt;p&gt;You can get this up and running in your local environment with docker. This is the GitHub &lt;a href="https://github.com/mohdizzy/flinkbroadcast" rel="noopener noreferrer"&gt;link&lt;/a&gt; to the Flink application including the docker-compose file. To make it easy, the docker file already includes the Kafka topics creation utilized within the Flink job.&lt;/p&gt;

&lt;p&gt;These are steps you would need to follow,&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Build the JAR file.&lt;/li&gt;
&lt;li&gt;Install docker.&lt;/li&gt;
&lt;li&gt;Run the docker command — &lt;code&gt;docker-compose up -d&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Go to &lt;a href="http://localhost:8081" rel="noopener noreferrer"&gt;http://localhost:8081&lt;/a&gt; and submit a job with the JAR file created in step 1.&lt;/li&gt;
&lt;li&gt;Connect to the Kafka container — &lt;code&gt;docker container exec -it &amp;lt;cluster-name&amp;gt; /bin/bash&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Push messages to the appropriate Kafka topic to monitor the flow — &lt;code&gt;kafka-console-producer — broker-list localhost:9092 — topic &amp;lt;name&amp;gt;&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;The above solution works great when there are a finite number of rules, although there is no direct restriction on the amount of data Flink can hold in the internal state. It’s the performance and code management that needs to be taken into consideration.&lt;/p&gt;

&lt;p&gt;There could be an argument about using a Lambda function instead of Flink to achieve the same result. While that makes complete sense in terms of simplicity, when real-time processing at a heavy scale is paramount, Flink is the better choice.&lt;/p&gt;

&lt;p&gt;It is also possible to use two different sources and then subject the combined stream to the Broadcasted rules. Most complex scenarios are likely possible with Flink when compared to other forms of compute for data streams. It all boils down to what works best in the long run and where application management is simpler.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>lambda</category>
      <category>apacheflink</category>
      <category>serverless</category>
    </item>
    <item>
      <title>Executing long running tasks with AppSync</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Sun, 02 Jun 2024 12:11:57 +0000</pubDate>
      <link>https://forem.com/aws-builders/executing-long-running-tasks-with-appsync-1ikk</link>
      <guid>https://forem.com/aws-builders/executing-long-running-tasks-with-appsync-1ikk</guid>
      <description>&lt;p&gt;AWS recently &lt;a href="https://aws.amazon.com/about-aws/whats-new/2024/05/aws-appsync-events-asynchronous-lambda-function-invocations/" rel="noopener noreferrer"&gt;announced&lt;/a&gt; a small (but notable) feature for the AppSync service where a data source associated with a Lambda can invoke the function asynchronously. Before this, AppSync was to only process all requests synchronously.&lt;/p&gt;

&lt;h2&gt;
  
  
  How does it help
&lt;/h2&gt;

&lt;p&gt;The current update opens the door to handling certain situations which weren’t possible earlier (or at least they were not as straight-forward to implement).&lt;/p&gt;

&lt;p&gt;Imagine when you have specific AppSync mutations that take more than 30 seconds to process owing to technical constraints. Since you always need to return a synchronous response, the way to deal with this would typically involve offloading the actual processing of the request to another function either by passing the payload via an SQS (provided payload is under 256KB) or doing a direct asynchronous invocation of the other lambda function and then returning a generic response by the Lambda function associated with the resolver. By offloading the actual processing to another function, the 30 second timeout limitation has been handled.&lt;/p&gt;

&lt;p&gt;Assuming the caller needs the final result of that long-running task, it would also need to make a subscription call so that it can receive the response once it has been completed. Triggering a mutation call (without an actual data source) is needed to deliver the subscription response.&lt;/p&gt;

&lt;p&gt;Specifying “Event” invocation method type now allows calling the function asynchronously. So now the additional step of handing off the actual processing through an SQS/Lambda can be eliminated.&lt;/p&gt;

&lt;h2&gt;
  
  
  The setup
&lt;/h2&gt;

&lt;p&gt;With in the JS resolver request function, all we need to do is specify the invocationType attribute. You would already be familiar with this when a function is invoked using the AWS SDK/CLI.&lt;/p&gt;

&lt;p&gt;The response function of the resolver can return a static response to indicate to the caller that AppSync has received the request for processing.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;export function request(ctx) {
    return {
      operation: "Invoke",
      invocationType: "Event",
      payload: ctx,
    };
}

export function response(ctx) {
  return "Received request";
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Things to keep in mind
&lt;/h2&gt;

&lt;p&gt;Asynchronous lambda invocations cannot have payload sizes more than 256KB. So if your existing synchronous AppSync request has a payload size beyond this value, switching to async mode is not going to be possible.&lt;/p&gt;

&lt;p&gt;Likewise, for the response, an AppSync request (query/mutation) can return a payload with a size limit of 5MB.&lt;/p&gt;

&lt;p&gt;The flow indicated above assumes the caller needs the result after the long-running task has finished processing. Eventbridge pipes is a great tool leveraging AWS infrastructure for triggering the mutation call whose response is the result for a Subscription request.&lt;/p&gt;

&lt;p&gt;With the use of Subscriptions we are able to deliver the final payload, but the payload size cannot exceed 240 KB. This isn’t something new but just to keep in mind that even though AWS enabled asynchronous request processing with AppSync, the final act of delivering a large payload is still a catch.&lt;/p&gt;

</description>
      <category>lambda</category>
      <category>eventbridge</category>
      <category>appsync</category>
      <category>serverless</category>
    </item>
    <item>
      <title>Setting up AppSync subscriptions for out-of-band updates with Eventbridge pipes</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Sun, 07 Jan 2024 06:34:58 +0000</pubDate>
      <link>https://forem.com/aws-builders/setting-up-appsync-subscriptions-for-out-of-band-updates-with-eventbridge-pipes-249a</link>
      <guid>https://forem.com/aws-builders/setting-up-appsync-subscriptions-for-out-of-band-updates-with-eventbridge-pipes-249a</guid>
      <description>&lt;h2&gt;
  
  
  Introduction:
&lt;/h2&gt;

&lt;p&gt;This short article talks about handling a specific scenario with AppSync when it comes to dealing with sending real-time updates for your event-driven serverless applications using GraphQL. If you would like to, more on GraphQL subscriptions can be explored here.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;&lt;strong&gt;Subscription&lt;/strong&gt;&lt;/em&gt; events via AppSync especially for those updates that are not triggered through a direct mutation call (calls made by your system resulting in data changes) has been addressed previously if you were to do a quick Google check. In short, the general working is that a subscription response is returned when the relevant mutation call is made. So for out-of-band updates, after triggering a mutation call that does nothing, we’re able to return the response data to the subscriber. How that mutation call is made depends on your flow and the way it fits in your event-driven way of doing it efficiently.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;In this article, I just show how we could go one step further and implement the whole flow with no code (without a Lambda function) using Eventbridge Pipes!&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;&lt;strong&gt;&lt;em&gt;Eventbridge&lt;/em&gt;&lt;/strong&gt; &lt;em&gt;&lt;strong&gt;Pipes&lt;/strong&gt;&lt;/em&gt;, if you’re unfamiliar with it, enables connecting point-to-point integrations between your source and target with added capabilities for filtering and enrichment flows without needing Lambda(s) in between for those operations. In a distributed event-driven system, Pipes can be a great tool to reduce overhead and streamline flows considerably.&lt;/p&gt;

&lt;h3&gt;
  
  
  Flow set-up:
&lt;/h3&gt;

&lt;p&gt;Let’s say there is a requirement to send subscription updates for a specific product ID whenever a new order is created by the system. All new orders are persisted into a DynamoDB table, and this table will be our source for triggering the flow using Pipes.&lt;/p&gt;

&lt;p&gt;DynamoDB streams is what we will configure as the source for our pipe.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fonxi6whis3dsdypxkko3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fonxi6whis3dsdypxkko3.png" alt="Pipe Source" width="800" height="481"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Since we’re interested in processing only those events that have been inserted for the first time into the table, we will set up the filtering to look for “INSERT” event type only.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fl8740ba7znnt7varnrxl.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fl8740ba7znnt7varnrxl.png" alt="Filtering for INSERT events only" width="800" height="476"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;There is no specific requirement for enrichment, so we can move on to setting up our target, which is API Destination. Note that the AppSync API and schema would need to be already created before configuring the API destination.&lt;/p&gt;

&lt;p&gt;It’s here where we will need to provide the AppSync endpoint and set up the payload transformation to build a mutation request which will be used to hit the endpoint.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmqorp1uucmqf5ofczqk6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmqorp1uucmqf5ofczqk6.png" alt="Target destination for the Pipe set to the AppSync API" width="800" height="468"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6nn9fpf2iefhq4epx2kx.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6nn9fpf2iefhq4epx2kx.png" alt="Using the DynamoDB stream event payload to build the mutation request" width="800" height="435"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Our GraphQL schema will look something like this. (For simplicity, I have set this up to only focus on the subscription flow and nothing else)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;schema {
  mutation: Mutation
  query: Query
  subscription: Subscription
}

type Mutation {
  createOrder(createOrderInput: CreateOrderInput!): OrderResponse
}

type Query {
  test(id:String): String
}

type Subscription {
  onOrderCreate(productId: String!): OrderResponse
    @aws_subscribe(mutations: ["createOrder"])
}

input CreateOrderInput {
  userId: String!
  orderId: String!
  productId: String!
}

type OrderResponse {
  productId: String
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And that’s pretty much it! You can do a quick test by making a subscription call at the AppSync console by providing a product ID in the input, and then insert a record at DynamoDB containing the same product ID value. The response should be immediately reflected in the console.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpbsb1asukvcmcnvxfjf8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpbsb1asukvcmcnvxfjf8.png" alt="Subscription response when the mutation call is made via Eventbridge" width="800" height="430"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/mohdizzy/appsync-pipes" rel="noopener noreferrer"&gt;Here&lt;/a&gt; is the complete repository set-up for reference.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Quick tip: When working with pipes, first set up the flow manually at the console and test it out. You can then export the cloudformation template from the console and add it to your serverless.yml file. This way you don’t spend time writing the IaC code for it!&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Conclusion:
&lt;/h3&gt;

&lt;p&gt;Pipes is a great tool to work with especially when you need to connect different AWS services to talk to each other and perform payload transformations in between. Pipes support for sources is mostly with streams but the target list is comparatively extensive. The usage of pipes makes it conducive to set up event-driven systems that are consistent across your applications, and therefore is worth exploring the possibility of incorporating it within your services.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>serverless</category>
      <category>eventbridge</category>
      <category>appsync</category>
    </item>
    <item>
      <title>Self-optimize Lambda memory configuration(s) at scale with Flink</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Sat, 28 Oct 2023 07:34:38 +0000</pubDate>
      <link>https://forem.com/aws-builders/self-optimize-lambda-memory-configurations-at-scale-with-flink-54g6</link>
      <guid>https://forem.com/aws-builders/self-optimize-lambda-memory-configurations-at-scale-with-flink-54g6</guid>
      <description>&lt;h2&gt;
  
  
  What’s this and why do it?
&lt;/h2&gt;

&lt;p&gt;Let’s say you want to optimize the memory allocated for your lambda function because you had over-provisioned it earlier as you didn’t know how much memory the code actually needs to run properly. To do this, logically, one would look at the Cloudwatch logs (or Lambda insights) to understand how much memory the function uses in an invocation and then sample them over time to find the “&lt;em&gt;sweet-spot&lt;/em&gt;”.&lt;/p&gt;

&lt;p&gt;This works well when you have a handful of lambdas to work with, but imagine if you need to optimize a thousand of them! What then? The manual approach is obviously time consuming. The set up I am going to explain here just automates this process at scale.&lt;/p&gt;

&lt;h2&gt;
  
  
  How it works:
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Access to lambda metrics:
&lt;/h3&gt;

&lt;p&gt;The first step of the flow is to access the memory utilization of a function over time in order to come up with a number that works well. The &lt;a href="https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html" rel="noopener noreferrer"&gt;telemetry API&lt;/a&gt; that’s part of Lambda extensions can help with this. We ship the platform logs so that the memory used across all invocations can be sampled over time.&lt;/p&gt;

&lt;h3&gt;
  
  
  Sampling the numbers:
&lt;/h3&gt;

&lt;p&gt;Now that you have access to the metrics (the maxMemory used), how do you go about using them to get an optimal value? This is where the CEP layer (Flink) comes into the picture. &lt;a href="https://nightlies.apache.org/flink/flink-docs-stable/" rel="noopener noreferrer"&gt;Flink&lt;/a&gt; is an open-source CEP framework from Apache.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;If you haven’t heard of Flink or don’t have a clear idea as to what it does, I encourage you to kindly read some of my previous blogs starting with this &lt;a href="https://mohdizzy.medium.com/serverless-complex-event-processing-with-apache-flink-60336d2abcd9" rel="noopener noreferrer"&gt;one&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;We gather the metrics of a lambda function over a 2 week stint and then determine the value to set by pushing a payload from Flink at the end of that period. Owing to how Flink operates internally, we are able to gather the logs of a function and keep track over a defined period individually at scale, and this is why I’ve employed the use of this compute layer.&lt;/p&gt;

&lt;p&gt;The process of collecting the metrics and determining the final value to push as output is all programmed within this Flink layer.&lt;/p&gt;

&lt;h3&gt;
  
  
  Updating the memory value:
&lt;/h3&gt;

&lt;p&gt;When Flink pushes the payload, another lambda uses that information and &lt;em&gt;makes a call via the AWS SDK to update the lambda memory&lt;/em&gt; configuration of the function whose details were provided in the Flink payload.&lt;/p&gt;

&lt;h2&gt;
  
  
  The nitty-gritty details of the setup:
&lt;/h2&gt;

&lt;p&gt;Firstly, the entire project is available at this Github &lt;a href="https://github.com/mohdizzy/lambdaMemoryOptimizer" rel="noopener noreferrer"&gt;repo&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;You will only need to run three commands from the terminal to deploy the whole thing.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Note: You will need Node, Maven &amp;amp; Serverless framework installed to run them.&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;npm install
npm run flink-build
serverless deploy --max-concurrency 2
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  IaC:
&lt;/h3&gt;

&lt;p&gt;The deployment of the entire infrastructure is done via the Serverless framework. Since there are multiple items to deploy, the setup uses serverless compose to deploy everything by issuing one command.&lt;/p&gt;

&lt;p&gt;These are the resources provisioned as part of the deployment.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;S3 bucket — contains lambda layer and Flink application .jar file&lt;/li&gt;
&lt;li&gt;Lambda(s) — worker function that ships logs to Flink and the updater function which sets the new memory value&lt;/li&gt;
&lt;li&gt;Kinesis Data Analytics — flink instance&lt;/li&gt;
&lt;li&gt;Lambda layer — extensions code that plugs into the worker function for delivering the logs&lt;/li&gt;
&lt;li&gt;Kinesis streams — source stream for pushing logs to Flink and destination stream which receives the outputs from Flink&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Shipping logs:
&lt;/h3&gt;

&lt;p&gt;As mentioned, the code that utilizes the telemetry API is injected via a Lambda layer which is also present in the repository. The logs are pushed to a Kinesis stream but in theory, they can be shipped almost anywhere. Kinesis was used because it works well with Flink.&lt;/p&gt;

&lt;p&gt;Any lambda that needs to be part of this automated optimizing setup will have to include the lambda layer as part of its deployment.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Note: The code for the extensions set up was forked from AWS’s samples GitHub &lt;a href="https://github.com/aws-samples/aws-lambda-extensions/tree/main/nodejs-example-telemetry-api-extension" rel="noopener noreferrer"&gt;repo&lt;/a&gt;.&lt;/em&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Flink:
&lt;/h3&gt;

&lt;p&gt;At a high level, this is what Flink is doing internally,&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;All logs contain the function name (this was added manually within the extensions dispatcher code). This function name is used to perform keyBy operation for every log event that comes through.&lt;/li&gt;
&lt;li&gt;The keyed stream within Flink allows state maintenance for every function individually.&lt;/li&gt;
&lt;li&gt;A timer is created from the very first instance of a given function log that is triggered exactly after 14 days. Within this period, all Flink will do is determine the highest value of maxMemory used for a given day based on all the logs that get pushed for that day, and keep that value in state.&lt;/li&gt;
&lt;li&gt;When the timer is fired, the average of all memory values is present in the state calculated. Just as an added buffer, the average value is incremented by 100 and then pushed out to the destination Kinesis stream from Flink.&lt;/li&gt;
&lt;li&gt;Of course, all of the above can be altered to what works best for you.&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;When it comes to Flink, we can even use Kafka as source/destination topics. I went with Kinesis for a nimble set up.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Memory updater:
&lt;/h3&gt;

&lt;p&gt;The consumer of the destination Kinesis stream is just a lambda function whose role is to read the payload containing the function name and memory value and make the memory configuration update using the AWS SDK.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion:
&lt;/h2&gt;

&lt;p&gt;Cost optimization with serverless architectures require adequate monitoring &amp;amp; observability to make optimal decisions in terms of resource configuration. The above set up merely handles one aspect of that concept with lambda functions.&lt;/p&gt;

</description>
      <category>flink</category>
      <category>lambda</category>
      <category>telemetry</category>
      <category>aws</category>
    </item>
    <item>
      <title>Optimize your Serverless architectures with event filtering</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Wed, 19 Apr 2023 07:20:13 +0000</pubDate>
      <link>https://forem.com/aws-builders/optimize-your-serverless-architectures-with-event-filtering-1dab</link>
      <guid>https://forem.com/aws-builders/optimize-your-serverless-architectures-with-event-filtering-1dab</guid>
      <description>&lt;p&gt;More often than not when working in a Serverless ecosystem with internal AWS services such as SQS, SNS, Kinesis etc. , we unintentionally tend not to leverage the filtering capabilities of those services which can reduce boilerplate code within your lambda functions, and also attribute to performance improvements.&lt;/p&gt;

&lt;h2&gt;
  
  
  Why do this?
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Reduced logic:&lt;/strong&gt; The most obvious benefit is reduced code in your lambda functions.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Cost:&lt;/strong&gt; The second most beneficial aspect is cost. If your source is sending a huge catalog of event types, and you are interested only in processing a fraction of them, then implementing event filtering is more of a necessity than an added feature.&lt;/p&gt;

&lt;p&gt;In this article, we will take a look at each of those AWS services which allow message filtering in brief and see how to quickly implement them for your application’s use case. Of course, it’s essential to understand the incoming payload structure in advance for the filtering to work effectively else you would only be discarding messages without ever invoking the function even for the ones you are actually interested in.&lt;/p&gt;

&lt;p&gt;All lambda filtering patterns for supported AWS services allow the following type of checks&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3nbe670kbguz7rdt6s7e.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3nbe670kbguz7rdt6s7e.png" alt="Sourced from AWS documentation" width="800" height="349"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
   "meta":{
      "triggerEventLog":{
         "id":"55b8826e35adc2ba-471f5164b8bc6221"
      }
   },
   "temperature":{
      "type":"celsius"
   },
   "version":"1.11.0"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Note that the syntax (to be added under your specific function) provided is for the Serverless framework but things should work in a similar way for most IaC frameworks.&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  SQS:
&lt;/h2&gt;

&lt;p&gt;SQS accepts plain text and JSON based filtering patterns. Notice that filterPatterns is an array, so you can add at the most 5 patterns per event source (as per AWS docs).&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Side note: If you are looking to understand on a deeper level how to process messages from an SQS queue at a massive scale, do give &lt;a href="https://aws.amazon.com/pt/blogs/apn/understanding-amazon-sqs-and-aws-lambda-event-source-mapping-for-efficient-message-processing/?advocacy_source=everyonesocial&amp;amp;trk=global_employee_advocacy&amp;amp;sc_channel=sm&amp;amp;es_id=54c69f1ffe&amp;amp;es_id=e547eca23c" rel="noopener noreferrer"&gt;this&lt;/a&gt; piece a read!&lt;br&gt;
&lt;/p&gt;
&lt;/blockquote&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    events:
      - sqs:
          arn: arn:aws:sqs:us-east-1:xxxx:filterTest
          filterPatterns:
            - body: {"temperature":{"type": ["celsius"]}}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Kafka:
&lt;/h2&gt;

&lt;p&gt;If your application uses Kafka (Amazon MSK) as a message broker, event filtering is possible with Kafka topics too. The syntax and the working are quite similar to SQS, the variation here is the “value” key is used instead of “body” as the kafka message in the incoming payload is present within the value key.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    events:
      - stream: 
          arn: arn:aws:kinesis:us-east-1:xxx:cluster/MyCluster/xxx
          topic: myTopic
          filterPatterns:
           - value:
              temperature: 
                type: ["celsius"] 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Kinesis streams:
&lt;/h2&gt;

&lt;p&gt;Filtering Kinesis data streams operates in the same fashion as SQS/MSK topics. The variation being with the “data” key.&lt;/p&gt;

&lt;p&gt;The amazing thing about filtering Kinesis/MSK events is that the incoming payload is always base64 encoded, however, AWS internally decodes and performs the filtering for you.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    events:
      - stream: 
          arn: arn:aws:kinesis:us-east-1:xxx:stream/filterTest
          filterPatterns:
           - data:
              temperature: 
                type: ["celsius"]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  DynamoDB Streams:
&lt;/h2&gt;

&lt;p&gt;Any changes to a record within the dynamodb table will trigger an event to the stream (when enabled). If your application is interested in processing only when there are insert operations being performed, then having an event filter can definitely reduce the number of invocations for your function.&lt;/p&gt;

&lt;p&gt;Since the lambda event that comes from a dynamodb stream invocation has a predefined structure, logically, the filtering needs to conform to that structure.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Sample event structure for reference&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The filter you see below has been set up to look for those insert operations when the temperature column has value starting with celsius. eg celsius#100&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    events:          
      - stream:
          type: dynamodb
          arn: stream_arn
          filterPatterns:
            - eventName: 'INSERT' # [INSERT|MODIFY|REMOVE] 
              dynamodb:
                Keys: # NewImage|OldImage|Keys
                  temperature: # name of field
                    S: [{"prefix":"celsius"}] # S for String, N for number, etc.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  SNS:
&lt;/h2&gt;

&lt;p&gt;Unlike SQS and other services, event filtering with SNS works differently.&lt;/p&gt;

&lt;p&gt;Every subscriber to an SNS topic can have its own filtering policy based on&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;MessageBody&lt;/li&gt;
&lt;li&gt;MessageAttributes&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In contrast to event filtering with other AWS services, SNS doesn’t allow nested event filtering, i.e. if you wish to filter a JSON payload that is nested within the object(s), SNS will not be able to filter such messages. However, the base filter concepts such as numeric (equals/range) and string (null/equals/begins with/exists/not exist) checks will still work just like other event sources.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    events:
      - sns:
          arn: arn:aws:sns:us-east-1:xxx:filterTest
          filterPolicyScope: MessageBody
          filterPolicy:
            version: 
              - "1.11.0"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  API Gateway:
&lt;/h2&gt;

&lt;p&gt;Request validation is a great feature that AWS provides which can be used to reject those payloads that do not conform to an API’s specification. With this setup, a lot of the elementary validation can be off loaded to the API gateway level. It not only allows the filtering of non-conformant payloads but also acts as a security barrier for any malicious request.&lt;/p&gt;

&lt;p&gt;The specifications of your API schema can be set up this way,&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "definitions": {},
    "$schema": "http://json-schema.org/draft-04/schema#",
    "type": "object",
    "title": "Filtering temperature sensor events",
    "required": ["temperature","version"],
    "properties": {
      "version": {
        "type": "string",
        "pattern": "^\d{1}\.\d{2}\.\d{1}$"
      },
      "temperature":{
        "type": "object",
        "properties": {
          "type": {
            "type":"string",
            "minLength": 1,
            "maxLength": 60
          }
        }
      },
      # Other examples of possible checks
      "price": { "type": "number", "minimum": 25, "maximum": 500 },
      "type":  { "type": "string", "enum": ["sensor1", "sensor2", "sensor3"] }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Syntax for &lt;em&gt;serverless.yml&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    events:
      - http:
          path: /temperature
          method: post   
          request:
            schema:
              application/json: ${file(valid_request.json)}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Going a step ahead in filtering:
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fww5jgnfimxk5ybk7x66u.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fww5jgnfimxk5ybk7x66u.png" alt="Eventbridge Pipes" width="800" height="131"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Pipes&lt;/strong&gt; is a feature that was recently introduced by AWS as part of Eventbridge that allows filtering, enriching, and delivery of that payload to a destination of your choice.&lt;/p&gt;

&lt;p&gt;To provide an overview of the four sections (filtering &amp;amp; enrichment are optional),&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Sources can be: SQS, DynamoDB, MSK, Self managed Kafka, Amazon MQ.&lt;/li&gt;
&lt;li&gt;The same filtering rules as above apply here too.&lt;/li&gt;
&lt;li&gt;If your filtered payload requires enrichment by calling external sources, that can be achieved by using: a lambda, API gateway, any external API, Step function workflow.&lt;/li&gt;
&lt;li&gt;The target list is quite extensive. It includes all the serverless components you can think of and more!&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This a great way tool to streamline data processing for Serverless workloads in an efficient manner. For instance, if the data source is the same but the event types are varied, you could essentially create dedicated “pipes” for each event that may require a different filter and enrichment process before it reaches the destination.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion:
&lt;/h2&gt;

&lt;p&gt;With the above items, we’ve covered all the event sources in AWS that support event/request filtering.&lt;/p&gt;

&lt;p&gt;Something to keep in mind, a queue usually has 1 consumer only, therefore you may not have the flexibility of having multiple filtering patterns. However, event sources like Kinesis, MSK, DynamoDB streams can have multiple consumers listening to the same stream, and hence each of those consumers can have independent filter patterns depending on the specific event types they are interested in processing.&lt;/p&gt;

&lt;p&gt;It’s definitely worth setting up event filtering where possible if you are looking to fine tune performance and costs with your Serverless architecture.&lt;/p&gt;

</description>
      <category>aws</category>
      <category>serverless</category>
      <category>filtering</category>
      <category>lambda</category>
    </item>
    <item>
      <title>Leverage Flink Windowing to process streams based on event time</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Sun, 04 Sep 2022 09:19:24 +0000</pubDate>
      <link>https://forem.com/aws-builders/leverage-flink-windowing-to-process-streams-based-on-event-time-4mlo</link>
      <guid>https://forem.com/aws-builders/leverage-flink-windowing-to-process-streams-based-on-event-time-4mlo</guid>
      <description>&lt;p&gt;The concepts explained in this article require working knowledge of Flink. If you have no prior experience, I'd recommend giving a read to one of my previous articles that explore the &lt;a href="https://mohdizzy.medium.com/serverless-complex-event-processing-with-apache-flink-60336d2abcd9" rel="noopener noreferrer"&gt;fundamental concepts&lt;/a&gt; of Flink.&lt;/p&gt;

&lt;h2&gt;
  
  
  What's Windowing and why you might need it:
&lt;/h2&gt;

&lt;p&gt;Imagine splitting an unbounded stream of events and processing them as individual buckets (or sets). You can perform computations on these individual sets of events to derive a meaningful event stream flowing out of Flink. There are a variety of use cases that you can achieve by making use of the operators/functions that Flink provides as part of its framework. The process of figuring out how to implement and chain those operators together rests with you. In this article, we'll explore the basics of windowing operator and how you can process out-of-order events.&lt;/p&gt;

&lt;h2&gt;
  
  
  Types of windows:
&lt;/h2&gt;

&lt;p&gt;Event streams may be keyed/non-keyed, and hence this factor will decide whether the windowing computation will occur in parallel across multiple tasks or in a single task.&lt;/p&gt;

&lt;p&gt;Windows can be of 4 types:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Tumbling windows - Non-overlapping processing of events with fixed time duration (aka window size).&lt;/li&gt;
&lt;li&gt;Sliding windows - Similar to tumbling windows with fixed window size with the ability to decide when the next window starts. This allows overlapping windows to share data between them.&lt;/li&gt;
&lt;li&gt;Session windows - In contrast to tumbling and sliding windows, session windows don't have a fixed window size but instead rely on the period of inactivity (defined by you) since the last event receipt to end that window, and then start a new window for subsequent events.&lt;/li&gt;
&lt;li&gt;Global windows - A global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting up the Flink Job:
&lt;/h2&gt;

&lt;p&gt;For the purposes of an example, we look at processing events based on the event's time.&lt;/p&gt;

&lt;p&gt;Before using the window operator/assigner, the source stream needs a &lt;em&gt;WatermarkStrategy&lt;/em&gt;. There are two watermark generators:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;em&gt;forMonotonousTimestamps()&lt;/em&gt; - To be used when it is known that the arriving events will always be in order.&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;forBoundedOutOfOrderness()&lt;/em&gt; - If the events are known to be out-of-order, a certain degree of lateness will be tolerated. This value can be set (in minutes/seconds/milliseconds) which will allow events after comparing with the current timestamp. If an event falls outside the timeframe that you have set for lateness, it is discarded from the stream (unless the allowedLateness value is set beyond this value).&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;There are different kinds of watermarking strategies that you can employ depending on your specific needs. To explore more, refer to this &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#late-elements" rel="noopener noreferrer"&gt;documentation&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Using the &lt;em&gt;withTimestampAssigner()&lt;/em&gt; function, you can then extract the timestamp value that is embedded in the event's body. Note that both timestamps/watermarks are specified as milliseconds since the Java epoch of 1970–01–01T00:00:00Z.&lt;/p&gt;


&lt;div class="ltag_gist-liquid-tag"&gt;
  
&lt;/div&gt;


&lt;p&gt;Here is a breakdown of the above snippet,&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;You first watermark every event that comes in from the source.&lt;/li&gt;
&lt;li&gt;The timestamp needs to be extracted from the incoming event and converted to milliseconds.&lt;/li&gt;
&lt;li&gt;Convert the stream to a keyed stream and place the window operator. Observe that the &lt;em&gt;TumblingEventTimeWindows&lt;/em&gt; function is used since we deal with the event's time and not processing time.&lt;/li&gt;
&lt;li&gt;Notice we've also used the optional function &lt;em&gt;allowedLateness&lt;/em&gt; that allows us to process events that may come beyond the limit set with &lt;em&gt;forBoundedOutOfOrderness()&lt;/em&gt;. This essentially allows us to process those events that arrived late but in a separate window in contrast to the window it could have been part of had it arrived in the correct sequence.&lt;/li&gt;
&lt;li&gt;Before we move to the next step, we can optionally place a &lt;em&gt;trigger()&lt;/em&gt; and &lt;em&gt;evictor()&lt;/em&gt; right after the window assigner function if your use case requires such functionality. Strategic use of these functions can help solve unique scenarios, so it's worth exploring them before employing some other custom logic.&lt;/li&gt;
&lt;li&gt;Finally, the last piece is the process function where you can place your custom windowing logic for those sets of events. This is quite similar to the generic process function except the events here are &lt;em&gt;Iterable&lt;/em&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Deploying Flink apps as Serverless:
&lt;/h2&gt;

&lt;p&gt;After your application logic is ready, running the Flink job is straight forward with AWS Kinesis Data analytics. You drop the built .jar file into an S3 bucket, create a new Flink application by pointing to that S3 bucket and that's it.&lt;/p&gt;

&lt;p&gt;The event source can either be a Kafka &lt;em&gt;topic&lt;/em&gt; or a &lt;em&gt;Kinesis Data stream&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;You set the computing power of the Flink instance by setting the &lt;em&gt;parallelism&lt;/em&gt; value. This value decides the number of &lt;a href="https://docs.aws.amazon.com/kinesisanalytics/latest/java/how-scaling.html" rel="noopener noreferrer"&gt;&lt;em&gt;KPUs&lt;/em&gt;&lt;/a&gt; that AWS will provision for your application. You can also enable auto-scaling to ensure that the application can handle an increased throughput if in case your initial parallelism isn't sufficient.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion:
&lt;/h2&gt;

&lt;p&gt;Windowing is at the heart of the Flink framework. In addition to what we saw in the window assigners, it is also possible to build your own custom windowing logic. Also, like any other keyed data stream, you can make use of state if such functionality is needed to perform computations. Once you gain enough understanding of the source event stream that you're working with, windowing can solve some complex problems that you might have. With that being said, experiment with windows and let me know if you have questions!&lt;/p&gt;

</description>
      <category>flink</category>
      <category>cep</category>
      <category>kda</category>
      <category>serverless</category>
    </item>
    <item>
      <title>Managing compliance the Serverless way with AWS Config custom rules</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Mon, 18 Apr 2022 17:19:52 +0000</pubDate>
      <link>https://forem.com/mohdizzy/managing-compliance-the-serverless-way-with-aws-config-custom-rules-fg</link>
      <guid>https://forem.com/mohdizzy/managing-compliance-the-serverless-way-with-aws-config-custom-rules-fg</guid>
      <description>&lt;p&gt;&lt;strong&gt;AWS Config&lt;/strong&gt; is a tool for continuous monitoring and conducting compliance checks across your resources in AWS. With the growing services in an AWS account, managing security and conducting routine audits can become cumbersome. This is where AWS Config can do all the heavy lifting with a few clicks. With detailed reports of the configuration history, this service simplifies auditing and maintaining compliance with any internal guidelines you may have.&lt;/p&gt;

&lt;p&gt;In this article, we will take a quick look at how to get started with AWS Config and also explore on how you can create your own custom rules that allow you to run evaluations against specific resources, along with using SNS for delivering changes in configurations.&lt;/p&gt;

&lt;h2&gt;
  
  
  Overview:
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;How to set up AWS Config&lt;/li&gt;
&lt;li&gt;SNS Event rule&lt;/li&gt;
&lt;li&gt;Brief about Conformance packs&lt;/li&gt;
&lt;li&gt;Creating custom AWS Config rule with Lambda&lt;/li&gt;
&lt;li&gt;Conclusion&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Getting started:
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Setting up AWS managed rules:
&lt;/h3&gt;

&lt;p&gt;In the first instance of using the service, you will be asked to create rules either for AWS resources or for the third party resource apps that your account may have been integrated with (eg. DataDog, Jfrog, Stackery etc). This can always be altered later by going to Settings at the dashboard.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnswc1fjvif49y4j90mqr.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnswc1fjvif49y4j90mqr.png" alt="AWS Config" width="800" height="254"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After specifying the bucket name for delivering the Config reports, you can optionally choose to associate an SNS topic which will then trigger notifications whenever there are configuration changes to your resources that Config is able to capture. We will get to set this up a little later.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcnd3wlrcq74psaf9xyfp.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcnd3wlrcq74psaf9xyfp.png" alt="Choose S3 bucket and SNS topic" width="800" height="313"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;On the next page, you can handpick the AWS managed rules for the resources against which the evaluation should be done. These rules will be triggered in two ways (depending on rule type)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;By frequency (value between 1h to 24h)&lt;/li&gt;
&lt;li&gt;Configuration changes (either by &lt;strong&gt;changes to some specific resource Id or changes to resources based on associated tags&lt;/strong&gt;)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffr0yn9by21ghljma7x0s.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffr0yn9by21ghljma7x0s.png" alt="Rules for evaluation" width="800" height="370"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  SNS Event rule:
&lt;/h2&gt;

&lt;p&gt;For configuring the SNS event notifications, head over to the Eventbridge dashboard and set up the AWS event for Config.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fy3bswzvfosw3qprjg8y5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fy3bswzvfosw3qprjg8y5.png" alt="SNS Config event type" width="800" height="357"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;You can configure the event type and custom JSON message for that event type. For now, we will stick to a simple setup for triggering notifications when there is a configuration change for all resources.&lt;/p&gt;

&lt;p&gt;Proceed to associate the SNS topic that was provided in the initial setup at the AWS Config dashboard.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conformance packs:
&lt;/h2&gt;

&lt;p&gt;Conformance packs are preset templates containing rules and configurations that provide a general-purpose framework for security and operational governance checks. Think of them as starter templates/rules which you should ideally extend to build compliance regulations that are in line with the organization’s standards.&lt;/p&gt;

&lt;p&gt;Here I have chosen the Serverless conformance pack that includes checks against the usual serverless resources: DynamoDb, API Gateway, Lambda and VPC endpoint setup to name some of the items in the list.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk5uju7yorm4lddz1i30a.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk5uju7yorm4lddz1i30a.png" alt="Serverless conformance pack" width="800" height="369"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating your own Config custom rule :
&lt;/h2&gt;

&lt;p&gt;Let’s consider a specific scenario wherein you want to ensure that all Lambda functions belonging to a specific VPC do not have an open 0.0.0.0/0 outbound rule in the Security group.&lt;/p&gt;

&lt;p&gt;On the rule creation page, choose the custom Lambda rule.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsvveorqaahsjrekd1n6h.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsvveorqaahsjrekd1n6h.png" alt="Custom Lambda rule" width="800" height="376"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Fill in the function ARN (you will need to create a function beforehand), the trigger type, and rule parameters (here I have specified as &lt;strong&gt;vpcid&lt;/strong&gt;).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9dceknvjflaah6l24378.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9dceknvjflaah6l24378.png" alt="Trigger type and Rule parameter" width="800" height="351"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const configClient = require("aws-sdk/clients/configservice"); 
const ec2Client = require("aws-sdk/clients/ec2");
const lambdaClient = require("aws-sdk/clients/lambda");

const ec2 = new ec2Client({ region: process.env.AWS_REGION });
const lambda = new lambdaClient({ region: process.env.AWS_REGION });
const config = new configClient(); 
const COMPLIANCE_STATES = {
  COMPLIANT: "COMPLIANT",
  NON_COMPLIANT: "NON_COMPLIANT",
  NOT_APPLICABLE: "NOT_APPLICABLE",
};

// Checks whether the invoking event is ScheduledNotification
function isScheduledNotification(invokingEvent) {
  return invokingEvent.messageType === "ScheduledNotification";
}

// Evaluates the configuration of the egress rule in the security group
const evaluateCompliance = async (vpcId) =&amp;gt; {
  const getLambdaInVPC = await ec2
    .describeNetworkInterfaces({
      Filters: [{ Name: "vpc-id", Values: [vpcId] }],
    })
    .promise();
  const lambdaList = [];

  getLambdaInVPC.NetworkInterfaces.forEach((item) =&amp;gt; {
    lambdaList.push(
      item.Description.split("AWS Lambda VPC ENI-")[1].split("-")[0]
    );
  });
  if (lambdaList) {
    const uniqueLambdaNames = [...new Set(lambdaList)];

    const getLambdaSGId = [];
    for (const item in uniqueLambdaNames) {
      const lambdaInfo = await lambda
        .getFunction({ FunctionName: uniqueLambdaNames[item] })
        .promise();
      lambdaInfo.Configuration.VpcConfig.SecurityGroupIds.forEach((item) =&amp;gt; {
        getLambdaSGId.push(item);
      });
    }
    const uniqueSGId = [...new Set(getLambdaSGId)];

    let complianceSGList = [];
    uniqueSGId.forEach((id) =&amp;gt; {
        complianceSGList.push({
            Id: id,
            Compliance: COMPLIANCE_STATES.COMPLIANT,
          })
    });
    const checkSGEgressInternetRule = await ec2
      .describeSecurityGroups({
        GroupIds: [...uniqueSGId],
        Filters: [{ Name: "egress.ip-permission.cidr", Values: ["0.0.0.0/0"] }],
      })
      .promise();

    if (checkSGEgressInternetRule.SecurityGroups) {
      checkSGEgressInternetRule.SecurityGroups.forEach((item) =&amp;gt; {
        const indexToUpdate = complianceSGList.findIndex((obj =&amp;gt; obj.Id == item.GroupId));
        complianceSGList[indexToUpdate].Compliance = COMPLIANCE_STATES.NON_COMPLIANT;
      });
      console.log(JSON.stringify(complianceSGList))
      return complianceSGList;
    }
  } else {
    return [{ Id: vpcId }, { Compliance: COMPLIANCE_STATES.NOT_APPLICABLE }];
  }
};

exports.handler = async (event, context) =&amp;gt; {
  // Parses the invokingEvent and ruleParameters values, which contain JSON objects passed as strings.
  console.log(JSON.stringify(event));
  const invokingEvent = JSON.parse(event.invokingEvent);
  const ruleParameters = JSON.parse(event.ruleParameters);


  if (isScheduledNotification(invokingEvent)) {
    // Passes the vpcid from the config rule parameter
    const checkCompliance = await evaluateCompliance(ruleParameters.vpcid);
    const putEvaluationsRequest = {
      ResultToken: event.resultToken,
    };
    putEvaluationsRequest.Evaluations = [];
    checkCompliance.forEach((item) =&amp;gt; {
      putEvaluationsRequest.Evaluations.push({
        ComplianceResourceType: "AWS::EC2::SecurityGroup",
        ComplianceResourceId: item.Id,
        ComplianceType: item.Compliance,
        OrderingTimestamp: new Date(),
      });
    });
    // Sends the evaluation results to AWS Config.
    const configResponse = await config.putEvaluations(putEvaluationsRequest).promise();
    return configResponse
  } else {
    console.log("Not a scheduled event");
  }
};
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The logic is essentially doing this,&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pull all Lambda functions associated with the provided VPC ID (part of the incoming Config event as &lt;strong&gt;rule parameter&lt;/strong&gt;)&lt;/li&gt;
&lt;li&gt;Get the Security group details.&lt;/li&gt;
&lt;li&gt;Filter those Security group IDs that have an outbound rule to 0.0.0.0/0.&lt;/li&gt;
&lt;li&gt;Return the evaluation result back to Config.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;When you manually invoke this rule’s evaluation, any changes to security group will trigger an SNS event. You can review the results at Config dashboard and see the list of those Security group IDs that are non-compliant.&lt;/p&gt;

&lt;p&gt;We can also set up an automated remediation response when the evaluation fails. In this case, choosing the &lt;em&gt;CloseSecurityGroup&lt;/em&gt; action from the list.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F02fzpgwdt41r7ijzacx0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F02fzpgwdt41r7ijzacx0.png" alt="Automatic remediation after compliance failure" width="800" height="537"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If in case a preset remediation action doesn’t exist, we could leverage the SNS event as a trigger for another Lambda function which will then carry out the remediation action for us, thereby &lt;strong&gt;automating the whole process of event detection and remediation without any manual intervention&lt;/strong&gt;. The flow would then look something like this,&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fio2aqt88seuupmrcomax.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fio2aqt88seuupmrcomax.jpeg" alt="Config event flow" width="800" height="360"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion:
&lt;/h2&gt;

&lt;p&gt;Compliance setups can get complicated as the number of services expands within an AWS account. With Config, it’s also possible to aggregate evaluation reports across multiple accounts which simplifies auditing when there are distributed resources in different regions belonging to other accounts.&lt;/p&gt;

&lt;p&gt;Different business units within an organization may have varied audit requirements and it can certainly get a whole lot more difficult to control and manage. Especially in cases when an organization is wanting to maintain strict compliance as per PCI standards, AWS Config can definitely be of great help in offloading those internal audits and security management tasks.&lt;/p&gt;

</description>
      <category>serverless</category>
      <category>compliance</category>
      <category>awsconfig</category>
      <category>lambd</category>
    </item>
    <item>
      <title>Testing stateful Flink applications locally</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Sat, 02 Oct 2021 18:23:39 +0000</pubDate>
      <link>https://forem.com/aws-builders/testing-stateful-flink-applications-locally-31jh</link>
      <guid>https://forem.com/aws-builders/testing-stateful-flink-applications-locally-31jh</guid>
      <description>&lt;p&gt;Before reading this article, if you’re new to Flink then consider heading over to this &lt;a href="https://mohdizzy.medium.com/serverless-complex-event-processing-with-apache-flink-60336d2abcd9" rel="noopener noreferrer"&gt;link&lt;/a&gt; where I provide an overview about Flink and explain some basic concepts.&lt;/p&gt;

&lt;p&gt;In this article, we’ll be exploring on how to set up your Flink applications to run tests locally. As the logic of your Flink applications gets complex over time it becomes even more difficult to debug and understand the behaviour of the unique input events that the application may receive. Specifically, with state-based applications, the ability to test locally facilitates faster development as these tests emulate the complete end-to-end flow with Flink without actually deploying the project.&lt;/p&gt;

&lt;h2&gt;
  
  
  Setup your pom.xml
&lt;/h2&gt;

&lt;p&gt;With the assumption of using Maven as the build mechanism for your Flink project, insert the following dependencies into your pom.xml. Note the scope of the dependencies.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;dependency&amp;gt;
    &amp;lt;groupId&amp;gt;org.apache.flink&amp;lt;/groupId&amp;gt;
    &amp;lt;artifactId&amp;gt;flink-runtime_2.11&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;1.11.1&amp;lt;/version&amp;gt;
    &amp;lt;scope&amp;gt;test&amp;lt;/scope&amp;gt;
    &amp;lt;classifier&amp;gt;tests&amp;lt;/classifier&amp;gt;
&amp;lt;/dependency&amp;gt;

&amp;lt;dependency&amp;gt;
    &amp;lt;groupId&amp;gt;org.apache.flink&amp;lt;/groupId&amp;gt;
    &amp;lt;artifactId&amp;gt;flink-streaming-java_2.11&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;1.11.1&amp;lt;/version&amp;gt;
    &amp;lt;scope&amp;gt;test&amp;lt;/scope&amp;gt;
    &amp;lt;classifier&amp;gt;tests&amp;lt;/classifier&amp;gt;
&amp;lt;/dependency&amp;gt;

&amp;lt;dependency&amp;gt;
    &amp;lt;groupId&amp;gt;org.assertj&amp;lt;/groupId&amp;gt;
    &amp;lt;artifactId&amp;gt;assertj-core&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;3.16.1&amp;lt;/version&amp;gt;
&amp;lt;/dependency&amp;gt;

&amp;lt;dependency&amp;gt;
    &amp;lt;groupId&amp;gt;org.junit.jupiter&amp;lt;/groupId&amp;gt;
    &amp;lt;artifactId&amp;gt;junit-jupiter-api&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;5.7.1&amp;lt;/version&amp;gt;
&amp;lt;/dependency&amp;gt;

&amp;lt;dependency&amp;gt;
    &amp;lt;groupId&amp;gt;org.junit.jupiter&amp;lt;/groupId&amp;gt;
    &amp;lt;artifactId&amp;gt;junit-jupiter-engine&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;5.7.1&amp;lt;/version&amp;gt;
    &amp;lt;scope&amp;gt;test&amp;lt;/scope&amp;gt;
&amp;lt;/dependency&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h2&gt;
  
  
  Writing tests
&lt;/h2&gt;

&lt;p&gt;We leverage Junit which is a framework to write tests for Java based applications. The Flink framework also additional classes to support testing the applications that we will explore in the sample below.&lt;/p&gt;

&lt;p&gt;Let’s consider the following sample test which is present under the &lt;em&gt;../src/test/SingleEventTest.java&lt;/em&gt; of your project.&lt;/p&gt;


&lt;div class="ltag_gist-liquid-tag"&gt;
  
&lt;/div&gt;



&lt;p&gt;We’ll get to what the tests do but before that, the above test takes the following into considerations.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The input source of the events is Kafka. (not directly relevant in the test, but something to note when it comes to deserializing the events)&lt;/li&gt;
&lt;li&gt;The Flink application has Side Outputs and therefore makes use of the OutputTag util class. Side outputs are great when your application needs to send out different types of outputs from Flink that are not intended to be the main output. In this sample, the application needs to send the events to a separate Kafka topic when the processing doesn’t follow the intended path.&lt;/li&gt;
&lt;li&gt;The events that the application consumes are JSON objects.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  testFlinkJob():
&lt;/h3&gt;

&lt;p&gt;This test is focused on validating the core logic of the application i.e. the KeyedProcessFunction (ProcessorFunction). You will only be running tests on that one operator to validate the flow of execution.&lt;/p&gt;

&lt;p&gt;The API classes used are part of the Flink framework and are geared towards running these types of tests. The appropriate classes are to be used for keyed streams and unkeyed streams for which the Flink operator is set up accordingly.&lt;/p&gt;

&lt;p&gt;The success/failure of the test is determined with the help of Junit assertions. In the above sample, we only check if output was generated for a given event or not, and also to check if a sideoutput has been generated for a specific test event. We could get into more detail by validating the individual keys of the output event with the appropriate assert statements.&lt;/p&gt;

&lt;h3&gt;
  
  
  endToEndTest():
&lt;/h3&gt;

&lt;p&gt;This test simulates the entire Flink framework while executing the input events that you specify as part of the test.&lt;/p&gt;

&lt;p&gt;The variation here is we setup a fakeSource() and a fakeSink by using the SourceFunction and SinkFunction functions, both of which are part of the Flink APIs. The idea is understand how the application behaves if it were to be deployed. You will notice how the inputs are sent within a loop and not by using any of the test API classes from Flink as they were actually being sent from the source.&lt;/p&gt;

&lt;p&gt;Just like the previous test, we use Junit assertions to validate the success of a test.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;One key difference that you will notice is that in the endToEndTest() function, all Flink operations like the onTimer() method are being called as per the flow. The same event when tested using the testFlinkJob() function will actually not trigger the timer.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Something to note is that if the application requires some custom steps in the deserialization process, it would be prudent to include that as part of your tests as well. You will notice the deserializer being called in the createEvent() and in the SourceFunction() function because in my sample application I have a custom Deserializer and Serializer. Since the events from Kafka are in the form bytes, post conversion to JSON string, if there are any transformations required on the JSON object it would make sense to perform that step in the Deserializer and not have a separate function for that. This ensures that the complete flow of the event from the deserializer step to Flink generating an output is tested even before pushing your code to a running instance.&lt;/p&gt;

&lt;h2&gt;
  
  
  Optional — Test coverage:
&lt;/h2&gt;

&lt;p&gt;If you would like to understand your test coverage and generate test reports, that can be done quite easily by adding these plugins as in the plugins section of your pom.xml.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;plugin&amp;gt;
    &amp;lt;groupId&amp;gt;org.apache.maven.plugins&amp;lt;/groupId&amp;gt;
    &amp;lt;artifactId&amp;gt;maven-surefire-plugin&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;3.0.0-M5&amp;lt;/version&amp;gt;
&amp;lt;/plugin&amp;gt;&amp;lt;plugin&amp;gt;
    &amp;lt;groupId&amp;gt;org.jacoco&amp;lt;/groupId&amp;gt;
    &amp;lt;artifactId&amp;gt;jacoco-maven-plugin&amp;lt;/artifactId&amp;gt;
    &amp;lt;version&amp;gt;0.8.6&amp;lt;/version&amp;gt;
    &amp;lt;configuration&amp;gt;
        &amp;lt;excludes&amp;gt;
            &amp;lt;exclude&amp;gt;**/serializers/*&amp;lt;/exclude&amp;gt;
            &amp;lt;exclude&amp;gt;**/FlinkJob.java&amp;lt;/exclude&amp;gt;
        &amp;lt;/excludes&amp;gt;
    &amp;lt;/configuration&amp;gt;
    &amp;lt;executions&amp;gt;
        &amp;lt;execution&amp;gt;
            &amp;lt;goals&amp;gt;
                &amp;lt;goal&amp;gt;prepare-agent&amp;lt;/goal&amp;gt;
            &amp;lt;/goals&amp;gt;
        &amp;lt;/execution&amp;gt;
        &amp;lt;execution&amp;gt;
            &amp;lt;id&amp;gt;jacoco-site&amp;lt;/id&amp;gt;
            &amp;lt;phase&amp;gt;package&amp;lt;/phase&amp;gt;
            &amp;lt;goals&amp;gt;
                &amp;lt;goal&amp;gt;report&amp;lt;/goal&amp;gt;
            &amp;lt;/goals&amp;gt;
        &amp;lt;/execution&amp;gt;
    &amp;lt;/executions&amp;gt;
&amp;lt;/plugin&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Closing comments:
&lt;/h2&gt;

&lt;p&gt;Writing tests as you code through the application logic ensures functional accuracy. State based applications can become complex to maintain because the execution of a Flink job may alter based on historical events, and putting the business logic through it’s paces with varied tests provides confidence in the development process.&lt;/p&gt;

&lt;p&gt;Adding log statements in info, error or debug mode at different places of your application makes a huge difference in following through the flow when the tests are being executed.&lt;/p&gt;

&lt;p&gt;Variations with the test event data for edge cases should be done more often as the events are keyed by specific values to enable state based processing.&lt;/p&gt;

&lt;p&gt;As a follow up with the testing process, checkpoints in Flink is an area that should be explored because, in the adverse situation that your Flink application hits a bump and restarts, the state is restored from the last successful checkpoint. It is possible to include the checkpointing mechanism as part of your tests and see how the overall application performs when something goes wrong. I’ll follow up soon on this front with an article shortly. Until then, give these tests a go and do reach out in case of any questions!&lt;/p&gt;

</description>
      <category>flink</category>
      <category>cep</category>
      <category>streamprocessing</category>
      <category>junit</category>
    </item>
    <item>
      <title>Serverless Complex Event Processing with Apache Flink</title>
      <dc:creator>Mohammed</dc:creator>
      <pubDate>Fri, 16 Apr 2021 09:46:19 +0000</pubDate>
      <link>https://forem.com/aws-builders/serverless-complex-event-processing-with-apache-flink-3dbm</link>
      <guid>https://forem.com/aws-builders/serverless-complex-event-processing-with-apache-flink-3dbm</guid>
      <description>&lt;h1&gt;
  
  
  What’s Apache Flink?
&lt;/h1&gt;

&lt;p&gt;Flink is a distributed processing engine that is capable of performing in-memory computations at scale for data streams. A data stream is a series of events such as transactions, user interactions on a website, application logs etc. from single or multiple sources. Streams in general can be of two types: bounded or unbounded. Bounded streams have a defined start and end, whereas unbounded streams once started, do not have a defined end. Flink is capable of handling both stream types with hyper scalability and state management.&lt;/p&gt;

&lt;h1&gt;
  
  
  Why use Flink?
&lt;/h1&gt;

&lt;p&gt;Some of the common use cases with Flink can be grouped into three broad categories; &lt;em&gt;Event driven applications, Data analytics, Data pipeline (ETL workloads)&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;em&gt;Event driven applications&lt;/em&gt;&lt;/strong&gt;: Building reactive applications with usage of state when certain events occur are classified as event driven applications. Stateful application processing is one of Flink’s core features. Let’s say if you would like to generate notifications/alerts based on certain events coming through from more than one source, Flink allows you to do so by maintaining an internal state so as to correlate events and determine if an alert needs to be sent out or not.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;em&gt;Data analytics&lt;/em&gt;&lt;/strong&gt;: Analyzing patterns by gathering insights from raw data in real time is possible with Flink. Consider a network monitoring system that detects outliers by consuming geographically distributed events in real time. Reacting to a negative effect can quickly mitigate downtime in systems which otherwise would prove to be catastrophic in nature.&lt;/p&gt;

&lt;p&gt;Flink also supports a robust &lt;em&gt;CEP&lt;/em&gt; (Complex Event Processing) library that can be used for pattern matching for the event streams. With the ability of handling multiple trillion events per day at hyper speed, there is no limit to what can be achieved within the Flink layer.&lt;/p&gt;

&lt;h1&gt;
  
  
  Flink Concepts:
&lt;/h1&gt;

&lt;p&gt;Before we get a Flink job running on Amazon Kinesis Data Analytics platform, there are some base concepts to be understood on how the framework works in general. Let’s look at a sample job written in scala. (Note that Flink jobs can be written in Java, Scala or Python — only Table API is supported as part AWS KDA)&lt;/p&gt;


&lt;div class="ltag_gist-liquid-tag"&gt;
  
&lt;/div&gt;


&lt;p&gt;The goal of the above job is to correlate events from two sources and identify certain values within the events for producing an alert. The above job has two input Kinesis streams and the output (sink) is a Kinesis stream as well.&lt;/p&gt;

&lt;p&gt;Notice the use of the Kinesis connector for consumer and producer. The parameters for the consumer &amp;amp; producer are like this:&lt;br&gt;
&lt;em&gt;new FlinkKinesisConsumer(stream name, the schema (with which the stream events should be parsed with), configuration properties)&lt;br&gt;
new FlinkKinesisProducer(schema, configuration properties)&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Operators in Flink play an important role in datastream transformations especially when they are chained together. As part of the job, there are three operators that perform datastream transformations,&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;flatMap&lt;/strong&gt; — A Map function (can be of Rich type if need be) that extracts data from the input event which is a stringified JSON object. Of the three flapMaps, two of them extract &amp;amp; parse the relevant data into a defined model (the data modelling is based on POJO — plain old java object classes.) using &lt;a href="https://github.com/json-path/JsonPath" rel="noopener noreferrer"&gt;JsonPath&lt;/a&gt;, the third flapMap is a &lt;em&gt;RichCoFlatMapFunction&lt;/em&gt; that determines after correlating events from the two sources whether it should be forwarded to the sink or not.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;keyBy&lt;/strong&gt; — This produces a keyed datastream by logically partitioning a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. A keyed stream is required for setting up state management. We will discuss more on this shortly.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;connect&lt;/strong&gt; — Allow connecting two data streams by retaining their types and produces a single stream where the correlation logic can be applied with the help of state.&lt;/li&gt;
&lt;/ul&gt;


&lt;div class="ltag_gist-liquid-tag"&gt;
  
&lt;/div&gt;


&lt;p&gt;The above code is one of the flatMap functions applied to an event stream. Looking at the job and this flatMap function, what we accomplish is keyed data stream with the use of &lt;em&gt;keyBy&lt;/em&gt; operator chained after the flatMap.&lt;/p&gt;

&lt;p&gt;The final flatMap that connects two streams together is where we make use of state for processing the incoming events from both streams.&lt;/p&gt;


&lt;div class="ltag_gist-liquid-tag"&gt;
  
&lt;/div&gt;


&lt;p&gt;There are various types of state that Flink supports. We make use of &lt;em&gt;ValueState&lt;/em&gt; in this case to keep track of the events and the count. A &lt;em&gt;BookingCreated&lt;/em&gt; event may arrive before or after a &lt;em&gt;BookingEvent&lt;/em&gt;, and it’s in this case state plays an important role for keeping track of the events coming through. All events from both streams are keyed by the same parameter and therefore logically partitioned by the same hash. This allows correlating events of the same parameter concurrently. We clear the state and output the event with two values once a condition is fulfilled.&lt;/p&gt;

&lt;p&gt;When it comes to working with state, a stream needs to be keyed and access to state is available only in &lt;em&gt;Rich&lt;/em&gt; functions.&lt;/p&gt;

&lt;p&gt;An important aspect to keep track of is the parallelism of your application. A Flink program consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split into several parallel instances for execution and each parallel instance processes a subset of the task’s input data. The number of parallel instances of a task is called its parallelism. Efficient setting of the parallelism is imperative to the scalability of your application.&lt;/p&gt;

&lt;p&gt;In this example, we have set the parallelism at the execution environment level to 2. However, since we would be using Kinesis Data Analytics, this setting can be dynamic in nature which we will see later.&lt;/p&gt;

&lt;h1&gt;
  
  
  Building &amp;amp; compiling the job:
&lt;/h1&gt;

&lt;p&gt;There are 3 ways to build the packaged .jar file for deploying your application.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Gradle&lt;/li&gt;
&lt;li&gt;Maven&lt;/li&gt;
&lt;li&gt;SBT&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Each of these have a defined setup that can be explored from here.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Note&lt;/em&gt;: When working with Kinesis Data Analytics, you want to ensure that the relevant dependencies match the Flink version that you will work with. As of date, KDA supports Flink 1.11.1.&lt;/p&gt;

&lt;h1&gt;
  
  
  Getting started on Amazon Kinesis Data Analytics (KDA):
&lt;/h1&gt;

&lt;p&gt;Amazon Kinesis Data Analytics is a managed serverless offering that allows you to setup the Flink engine for your streaming applications. There are no servers to manage, no minimum fee or setup cost, and you only pay for the resources your streaming applications consume. KDA integrates with Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Amazon Elasticsearch Service, Amazon DynamoDB Streams, Amazon S3, custom integrations, and more using built-in connectors.&lt;/p&gt;

&lt;p&gt;With the application jar built, upload it to an S3 bucket and configure the KDA application to point to that S3 bucket object location. You probably want to keep logging turned on as it helps in narrowing down the problem when things go wrong with the application logic or when the job itself isn’t running.&lt;/p&gt;

&lt;p&gt;KDA allows automatic scaling of an application when the throughput exceeds beyond a certain limit or conversely scales down when usage is consistent for a certain period of time. AWS scales up/down your application automatically depending on these conditions,&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Your application scales up (increases parallelism) when your CPU usage remains at 75 percent or above for 15 minutes.&lt;/li&gt;
&lt;li&gt;Your application scales down (decreases parallelism) when your CPU usage remains below 10 percent for six hours.&lt;/li&gt;
&lt;li&gt;AWS will not reduce your application’s current Parallelism value to less than your application’s Parallelism setting.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;With a running job, the KDA console will look similar to this wherein you can explore various attributes having to do with the resource utilization of your application.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwk5xepoz9op6gfdyxe54.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwk5xepoz9op6gfdyxe54.png" alt="Alt Text" width="800" height="322"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;With every successful record flowing through the input stream, the record counter updates accordingly.&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion:
&lt;/h1&gt;

&lt;p&gt;We have looked at some basic concepts of the Flink framework, and how powerful it is for event aggregation at hyper scale with ultra low latency. Kinesis Data Analytics for Flink does most of the heavy lifting in terms of configuration and scalability so the focus can be directed towards the application development. AWS does offer the Flink engine as part of their EMR service if you like precise control over how the cluster should be set up and the libraries that are used under the hood.&lt;/p&gt;

&lt;p&gt;Another core concept of Flink that is certainly worth exploring is windowing. Windowing allows processing of your streams into buckets of finite size. Windows can be processed based on time or on triggers. For a deep dive visit this &lt;a href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html" rel="noopener noreferrer"&gt;link&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The above code samples were only for illustrative purposes to show the working of Flink from a high level, however, feel free to drop a comment or DM on twitter if you would like to know more about the set up.&lt;/p&gt;

</description>
      <category>serverless</category>
      <category>apacheflink</category>
      <category>streamprocessing</category>
      <category>eventprocessing</category>
    </item>
  </channel>
</rss>
