<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Carlos Zambrano</title>
    <description>The latest articles on Forem by Carlos Zambrano (@czam01).</description>
    <link>https://forem.com/czam01</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F533983%2Fd0f20fb4-8b71-44e7-8cce-859d8dda5974.png</url>
      <title>Forem: Carlos Zambrano</title>
      <link>https://forem.com/czam01</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/czam01"/>
    <language>en</language>
    <item>
      <title>Adding Custom Operators on  
Amazon Managed Workflows for Apache Airflow</title>
      <dc:creator>Carlos Zambrano</dc:creator>
      <pubDate>Sun, 13 Dec 2020 18:15:37 +0000</pubDate>
      <link>https://forem.com/aws-builders/adding-custom-operators-on-amazon-managed-workflows-for-apache-airflow-1cg</link>
      <guid>https://forem.com/aws-builders/adding-custom-operators-on-amazon-managed-workflows-for-apache-airflow-1cg</guid>
      <description>&lt;h1&gt;
  
  
  Introduction to Apache Airflow on AWS (MWAA)
&lt;/h1&gt;

&lt;p&gt;Amazon Managed Workflows for Apache Airflow (MWAA) is a fully managed service that allows us to orchestrate, manage and create Data and Machine Learning Pipelines in AWS based on &lt;a href="https://airflow.apache.org/" rel="noopener noreferrer"&gt;Apache Airflow&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Currently, many customers run their pipelines using Apache Airflow in EKS, ECS, or EC2, in which they have to spend a lot of time in the administration of all the components that Apache Airflow has such as Scheduler, Executor, WebServer, WebUI, Workers, DB and the Broker. Additionally, all Security, authentication, authorization, and logging options must be configured.&lt;/p&gt;

&lt;p&gt;When using Amazon Managed Workflows for Apache Airflow (MWAA) AWS manages all the components related to instances, storage, software installation, integration with IAM SSO, Logging (Cloudwatch), Workers Scaling allowing the flexibility to add custom configurations and install operators, hooks, sensors, and plugins without any inconvenience.&lt;/p&gt;

&lt;h1&gt;
  
  
  Adding our Slack plugin to MWAA
&lt;/h1&gt;

&lt;p&gt;As our Data &amp;amp; Analytics and Machine Learning projects grow, flexibility becomes more important, this leads us to use custom plugins within MWAA to be able to create different types of tasks that fit our needs, now we will see the steps to create a plugin (operators, hooks, Sensors), how to validate their creation and monitoring.&lt;/p&gt;

&lt;p&gt;To better understand this process I will show you how to add a Notification Plugin in Slack.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; All the code in this blog post is in &lt;a href="https://github.com/czam01/mwaa" rel="noopener noreferrer"&gt;My GitHub Repo&lt;/a&gt;. &lt;/p&gt;

&lt;h2&gt;
  
  
  1. Plugin Directory Structure
&lt;/h2&gt;

&lt;p&gt;The directory structure must be in the following format, take into account that for this example I am not using sensors.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;py&lt;/span&gt;
    &lt;span class="o"&gt;|--&lt;/span&gt; &lt;span class="n"&gt;slack_plugin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;py&lt;/span&gt; 
&lt;span class="n"&gt;hooks&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;
    &lt;span class="o"&gt;|--&lt;/span&gt; &lt;span class="n"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;py&lt;/span&gt;
    &lt;span class="o"&gt;|--&lt;/span&gt; &lt;span class="n"&gt;slack_webhook_hook&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;py&lt;/span&gt;
&lt;span class="n"&gt;operators&lt;/span&gt;&lt;span class="o"&gt;/&lt;/span&gt;
    &lt;span class="o"&gt;|--&lt;/span&gt; &lt;span class="n"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;py&lt;/span&gt;
    &lt;span class="o"&gt;|--&lt;/span&gt; &lt;span class="n"&gt;slack_webhook_operator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;py&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  2. Content
&lt;/h2&gt;

&lt;p&gt;A slack_plugin.py file should be created in this case with the following content:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.plugins_manager&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;AirflowPlugin&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;hooks.slack_webhook_hook&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SlackWebhookHook&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;operators.slack_webhook_operator&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SlackWebhookOperator&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;slack_webhook_operator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;SlackWebhookOperator&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
  &lt;span class="k"&gt;pass&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;slack_webhook_hook&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;SlackWebhookHook&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
  &lt;span class="k"&gt;pass&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;slack_plugin&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;AirflowPlugin&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;

    &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;my_slack_plugin&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;       
    &lt;span class="n"&gt;hooks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;slack_webhook_hook&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;operators&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;slack_webhook_operator&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This code shows that we have to create a class with the name of our plugin and the name of the hook and operator (for this case we do not use sensors). Also, keep in mind that the routes to call the operators are different from the Operators that come by default with Apache Airflow 1.10.12&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;PythonOperator&lt;/strong&gt; Path (comes with the default installation):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators.python_operator&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;PythonOperator&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;SlackWebhookOperator&lt;/strong&gt; Path (custom operator):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;operators.slack_webhook_operator&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SlackWebhookOperator&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  3. Install the Plugin
&lt;/h2&gt;

&lt;p&gt;To install the plugin you must be in the plugins folder of your project:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;MacBook-Pro-73:mwaa czam&lt;span class="nv"&gt;$ &lt;/span&gt; &lt;span class="nb"&gt;cd &lt;/span&gt;plugins
MacBook-Pro-73:mwaa czam&lt;span class="nv"&gt;$ &lt;/span&gt; &lt;span class="nb"&gt;chmod&lt;/span&gt; &lt;span class="nt"&gt;-R&lt;/span&gt; 755 &lt;span class="nb"&gt;.&lt;/span&gt;
MacBook-Pro-73:mwaa czam&lt;span class="nv"&gt;$ &lt;/span&gt; zip &lt;span class="nt"&gt;-r&lt;/span&gt; plugins.zip &lt;span class="nb"&gt;.&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once you have compressed the plugins.zip file, you must upload it to your S3 bucket specified in the MWAA configuration:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fs3-mwaa.png%3Fraw%3Dtrue" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fs3-mwaa.png%3Fraw%3Dtrue" alt="S3 Configuration"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Once the plugins.zip file is uploaded to the specified path, you must go to the MWAA environment and select the newly uploaded file. We click Next and then Save. By doing this the environment enters an Updating state and takes a few minutes to return to the Available state.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fupdate-mwaa.png%3Fraw%3Dtrue" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fupdate-mwaa.png%3Fraw%3Dtrue" alt="Updating MWAA"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  4. Install Requirements
&lt;/h2&gt;

&lt;p&gt;The requirements.txt file should be loaded with the packages we need to install, in this case, the file should look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;apache&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;airflow&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;slack&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note that the same step must be done for plugins.zip for MWAA to recognize requirements.txt&lt;/p&gt;

&lt;h2&gt;
  
  
  5. Test the Custom Plugin
&lt;/h2&gt;

&lt;p&gt;To validate that the plugin was installed we must create a DAG and execute the SlackWebhookOperator task. The following code is for a task with SlackWebhookOperator:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;task1&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SlackWebhookOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;task_1&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;http_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;slack_connection&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;I am the task 1&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;#airflowchannel&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;dag&lt;/span&gt;
  &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To finish this configuration, a connection must be added in connections, which must contain the webhook created in slack.&lt;/p&gt;

&lt;p&gt;Go to the connections option.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Foption-mwaa.png%3Fraw%3Dtrue" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Foption-mwaa.png%3Fraw%3Dtrue" alt="Connections Option"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now you need to add an HTTP connection and should be like this: &lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fhttp-connection-mwaa.png%3Fraw%3Dtrue" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fhttp-connection-mwaa.png%3Fraw%3Dtrue" alt="Add Connection"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To create a webhook in Slack follow these &lt;a href="https://slack.com/intl/en-co/help/articles/115005265063-Incoming-webhooks-for-Slack" rel="noopener noreferrer"&gt;these&lt;/a&gt; steps. &lt;/p&gt;

&lt;p&gt;After making the configurations, we activate the dag and verify in our slack channel that the notifications begin to arrive.&lt;/p&gt;

&lt;h2&gt;
  
  
  6. Monitor your Plugin
&lt;/h2&gt;

&lt;p&gt;To monitor our MWAA environment we can in CloudWatch review the Log groups in which we can see the status of scheduler, web server, tasks, and for each of the dags.&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fcloudwatch-mwaa.png%3Fraw%3Dtrue" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fcloudwatch-mwaa.png%3Fraw%3Dtrue" alt="Cloudwatch MWAA"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Additionally, within the Apache Airflow console, we can see in Tree View the number of executions for each task and their respective status&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fdag-monitoring-mwaa.png%3Fraw%3Dtrue" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fczam01%2Fmwaa%2Fblob%2Fmaster%2Fimages%2Fdag-monitoring-mwaa.png%3Fraw%3Dtrue" alt="Dag Monitoring"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h1&gt;
  
  
  Learned lessons and Best Practices
&lt;/h1&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;When creating the environment, it is recommended to enable logging at the INFO level for all components, this will allow you to have greater visibility of the status of the entire environment.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;When you need to execute custom tasks it is best to create custom operators and add them within a plugin.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In the official Apache Airflow documentation you will find a large number of Operators, Hooks, and Sensors that you can use.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;To interact with AWS services you can do it in different ways, you can use a PythonOperator and use the boto3 library, you can also create a connection with AWS and interact with the services through custom operators.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Pipelines can be created for data and analytics projects and also &lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/contrib/operators/sagemaker_transform_operator/index.html" rel="noopener noreferrer"&gt;Machine Learning&lt;/a&gt;, you can make use of operators to connect with services such as sage maker.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;All the content in this post could be found &lt;a href="https://github.com/czam01/mwaa" rel="noopener noreferrer"&gt;here&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;-- &lt;a href="https://dev.to/czam01"&gt;Carlos Zambrano&lt;/a&gt;&lt;/p&gt;

</description>
      <category>airflow</category>
      <category>mwaa</category>
      <category>aws</category>
      <category>analytics</category>
    </item>
  </channel>
</rss>
