<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Nazli Ander</title>
    <description>The latest articles on Forem by Nazli Ander (@nazliander).</description>
    <link>https://forem.com/nazliander</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F229249%2F108734b9-354d-4599-8b40-a7c07fd217d4.jpg</url>
      <title>Forem: Nazli Ander</title>
      <link>https://forem.com/nazliander</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/nazliander"/>
    <language>en</language>
    <item>
      <title>Realtime Analysis of Cryptocurrency Prices Using dbt, Materialize, Redpanda &amp; Metabase</title>
      <dc:creator>Nazli Ander</dc:creator>
      <pubDate>Mon, 07 Mar 2022 20:24:51 +0000</pubDate>
      <link>https://forem.com/nazliander/realtime-analysis-of-cryptocurrency-prices-using-dbt-materialize-redpanda-metabase-41n2</link>
      <guid>https://forem.com/nazliander/realtime-analysis-of-cryptocurrency-prices-using-dbt-materialize-redpanda-metabase-41n2</guid>
      <description>&lt;p&gt;Materialize organized an &lt;a href="https://materialize.com/resources/materialize-dbt-redpanda-virtual-hack-day/" rel="noopener noreferrer"&gt;Online Hack Day&lt;/a&gt; a while ago. And they provided a structured streaming setup using &lt;a href="https://docs.getdbt.com/docs/building-a-dbt-project/documentation" rel="noopener noreferrer"&gt;dbt&lt;/a&gt; (ETL framework), &lt;a href="https://docs.redpanda.com/docs/quickstart/" rel="noopener noreferrer"&gt;Redpanda&lt;/a&gt; (queue), &lt;a href="https://materialize.com/docs/get-started/" rel="noopener noreferrer"&gt;Materialize&lt;/a&gt; (database) and &lt;a href="https://www.metabase.com/" rel="noopener noreferrer"&gt;Metabase&lt;/a&gt; (visualization).&lt;/p&gt;

&lt;p&gt;The initial setup was using flight data with &lt;a href="https://openskynetwork.github.io/opensky-api/rest.html" rel="noopener noreferrer"&gt;OpenSky API&lt;/a&gt; to aggregate flight information. I re-purposed the structured streaming setup to use cryptocurrency data in a real-time dashboard.&lt;/p&gt;

&lt;p&gt;Without changing much of the setup, it was a great comfort to create a new producer via &lt;a href="https://developers.coinranking.com/api/documentation" rel="noopener noreferrer"&gt;CoinRanking&lt;/a&gt;. Then I created two financial queries in dbt to answer the following questions in real-time:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;What is the difference of marketcap within the last 20 minutes per crypto currency?&lt;/li&gt;
&lt;li&gt;What are the most deviating cryptocurrencies within the last 20 minutes?&lt;/li&gt;
&lt;li&gt;What are the average prices per cryptocurrency within the last 20 minutes?&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;I connected the queries (materialized views in Materialize) to Metabase, to visualize my answers. Here is a screenshot from the resulting example dashboard in Metabase:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffbzgo4fr8m6quurx7jvj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffbzgo4fr8m6quurx7jvj.png" alt="Metabase Dashboard" width="800" height="691"&gt;&lt;/a&gt; &lt;/p&gt;

&lt;p&gt;In this small memorial write-up, I will try to summarize the pipeline for this analysis. I aim to show the easiness of creating a real-time financial analysis with it.&lt;/p&gt;

&lt;h2&gt;
  
  
  Pipeline Summary
&lt;/h2&gt;

&lt;p&gt;The pipeline that was created by Materialize had the following chronological order, and I re-purposed the same structure by changing the producer:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Ingest real-time data with a Python producer into RedPanda&lt;/li&gt;
&lt;li&gt;Create a source from RedPanda in Materialize using dbt (raw data ingestion)&lt;/li&gt;
&lt;li&gt;Create a staging view in Materialize to type-cast JSON fields (staging ingested data)&lt;/li&gt;
&lt;li&gt;Create a materialized view(s) in Materialize to produce real-time windowed aggregations&lt;/li&gt;
&lt;li&gt;Use Metabase to visualize data
The technologies relate to each other as the following diagram suggests:&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcsiismfmiy612ssy1gd7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcsiismfmiy612ssy1gd7.png" alt="Pipeline Setup Summary" width="800" height="208"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The setup is initialized with &lt;a href="https://github.com/nazliander/mz-hack-day-2022/blob/main/sample_project/docker-compose.yml" rel="noopener noreferrer"&gt;a very diligent docker-compose file&lt;/a&gt;, created by @morsapaes. When we &lt;code&gt;docker-compose up&lt;/code&gt;, we have the following services running on your local machine:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;IMAGE                                               COMMAND                  PORTS                                                                NAMES
sample_project_data-generator                       "python -u ./crypto.…"                                                                        data-generator
sample_project_dbt                                  "/bin/bash"              0.0.0.0:8002-&amp;gt;8080/tcp                                               dbt
metabase/metabase                                   "/app/run_metabase.sh"   0.0.0.0:3030-&amp;gt;3000/tcp                                               metabase
docker.vectorized.io/vectorized/redpanda:v21.11.3   "/entrypoint.sh 'red…"   0.0.0.0:8081-8082-&amp;gt;8081-8082/tcp, 0.0.0.0:9092-&amp;gt;9092/tcp, 9644/tcp   redpanda
materialize/materialized:v0.20.0                    "tini -- materialize…"   0.0.0.0:6875-&amp;gt;6875/tcp                                               materialized
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  CoinRanking Producer in a Nutshell
&lt;/h2&gt;

&lt;p&gt;To fetch real-time &lt;a href="https://developers.coinranking.com/api/documentation" rel="noopener noreferrer"&gt;CoinRanking&lt;/a&gt; data into RedPanda, we need a small script. The script needs to read data from CoinRanking within periods. Then it needs to flush the read data into the RedPanda instance. Since RedPanda is Kafka compatible, that is quite straight forward with Python Kafka client.&lt;/p&gt;

&lt;p&gt;Producer script basically reads data from the CoinRanking API, using &lt;a href="https://developers.coinranking.com/api/documentation#authentication" rel="noopener noreferrer"&gt;a token&lt;/a&gt;. Then it picks up the required fields from the API data requested. Lastly it ingests the list of coin information into RedPanda within every 10 seconds. Here is the compact script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;schedule&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KafkaProducer&lt;/span&gt;

&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;basicConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;level&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;INFO&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%(asctime)-7s %(levelname)-1s %(message)s&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;datefmt&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%Y-%m-%d %H:%M:%S&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;handlers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
        &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;StreamHandler&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
    &lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;COIN_PAGE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;https://api.coinranking.com/v2/coins&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;CRYPTO_TOPIC&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;crypto&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

&lt;span class="n"&gt;TOKEN&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;TOKEN&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;HEADERS&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Authorization&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;access_token &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;TOKEN&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;FREQUENCY_INGESTION&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;produce_list_of_coin_dict_into_queue&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;list_of_dict&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;list&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bootstrap_servers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;redpanda:9092&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;coin_with_model&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;list_of_dict&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;send&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;CRYPTO_TOPIC&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;coin_with_model&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;uuid&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;uuid4&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nb"&gt;hex&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;encode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;The problem is: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;!&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;flush&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_json_api&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;page&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;tuple&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;get_request&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;page&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;headers&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;HEADERS&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;assert&lt;/span&gt; &lt;span class="n"&gt;get_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;status_code&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;200&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Request not successful&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;get_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;get_request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;status_code&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_coin_model&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nb"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;uuid&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;uuid&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;symbol&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;symbol&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;btc_price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;btcPrice&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;last_24h_volume&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;24hVolume&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;marketcap&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;marketCap&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;price&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;timestamp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;now&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%Y-%m-%d %H:%M:%S&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Exception: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;coins_producer&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;all_coins&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;get_json_api&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;COIN_PAGE&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;coins_with_model&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;get_all_coins_with_model&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;all_coins&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;produce_list_of_coin_dict_into_queue&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;coins_with_model&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="nf"&gt;coins_producer&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;schedule&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;every&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;FREQUENCY_INGESTION&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;seconds&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;do&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;coins_producer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;schedule&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;run_pending&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;By using this producer, we regularly ingest crypto data into RedPanda with a topic called &lt;code&gt;crypto&lt;/code&gt;. We expect to asynchronously consume the ingested data from the topic &lt;code&gt;crypto&lt;/code&gt; and use in our analyses.&lt;/p&gt;

&lt;h2&gt;
  
  
  RedPanda Source in Materialize
&lt;/h2&gt;

&lt;p&gt;The project uses dbt adapter for Materialize. The adapter (&lt;code&gt;dbt-materialize&lt;/code&gt;) is a Python package available &lt;a href="https://pypi.org/project/dbt-materialize/" rel="noopener noreferrer"&gt;on PyPI&lt;/a&gt;. This package allows us to use SQL + Jinja statements to efficiently transform streaming data and continuously update our data.&lt;/p&gt;

&lt;p&gt;According to Materialize, the connected data sources are called &lt;code&gt;source&lt;/code&gt;. Creating a source in Materialize is possible by introducing a queue connection with a simple DDL statement.&lt;/p&gt;

&lt;p&gt;Since we make use of dbt, this is as easy as using the following lines:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'source'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'crypto'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="k"&gt;set&lt;/span&gt; &lt;span class="n"&gt;source_name&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="n"&gt;mz_generate_name&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'rc_coins'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="n"&gt;endset&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SOURCE&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="n"&gt;source_name&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;KAFKA&lt;/span&gt; &lt;span class="n"&gt;BROKER&lt;/span&gt; &lt;span class="s1"&gt;'redpanda:9092'&lt;/span&gt; &lt;span class="n"&gt;TOPIC&lt;/span&gt; &lt;span class="s1"&gt;'crypto'&lt;/span&gt;
  &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="n"&gt;FORMAT&lt;/span&gt; &lt;span class="n"&gt;BYTES&lt;/span&gt;
  &lt;span class="n"&gt;VALUE&lt;/span&gt; &lt;span class="n"&gt;FORMAT&lt;/span&gt; &lt;span class="n"&gt;BYTES&lt;/span&gt;
&lt;span class="n"&gt;ENVELOPE&lt;/span&gt; &lt;span class="n"&gt;UPSERT&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After defining the source, ingested data will be recognized by the database. However, we need to convert the string to the database encoding and JSON. This is required for column mapping, as our data is stored in JSON object literals. In the end we can map the column values by accessing and casting the object values:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'view'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'crypto'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="n"&gt;converted_casted&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;SELECT&lt;/span&gt; 
        &lt;span class="k"&gt;CAST&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CONVERT_FROM&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'utf8'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;jsonb&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="k"&gt;data&lt;/span&gt;
    &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'rc_coins'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="o"&gt;-&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="s1"&gt;'uuid'&lt;/span&gt;&lt;span class="p"&gt;)::&lt;/span&gt;&lt;span class="n"&gt;string&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;uuid&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="o"&gt;-&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="s1"&gt;'name'&lt;/span&gt;&lt;span class="p"&gt;)::&lt;/span&gt;&lt;span class="n"&gt;string&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="o"&gt;-&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="s1"&gt;'symbol'&lt;/span&gt;&lt;span class="p"&gt;)::&lt;/span&gt;&lt;span class="n"&gt;string&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="o"&gt;-&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="s1"&gt;'btc_price'&lt;/span&gt;&lt;span class="p"&gt;)::&lt;/span&gt;&lt;span class="nb"&gt;numeric&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;btc_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="o"&gt;-&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="s1"&gt;'last_24h_volume'&lt;/span&gt;&lt;span class="p"&gt;)::&lt;/span&gt;&lt;span class="nb"&gt;numeric&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;last_24h_volume&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="o"&gt;-&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="s1"&gt;'marketcap'&lt;/span&gt;&lt;span class="p"&gt;)::&lt;/span&gt;&lt;span class="nb"&gt;numeric&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;marketcap&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="o"&gt;-&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="s1"&gt;'price'&lt;/span&gt;&lt;span class="p"&gt;)::&lt;/span&gt;&lt;span class="nb"&gt;double&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;data&lt;/span&gt;&lt;span class="o"&gt;-&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="s1"&gt;'timestamp'&lt;/span&gt;&lt;span class="p"&gt;)::&lt;/span&gt;&lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;converted_casted&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Materialized Views in Materialize
&lt;/h2&gt;

&lt;p&gt;One of the great features of Materialize is their &lt;code&gt;materialized view&lt;/code&gt;. This is a streaming computation of a &lt;code&gt;SELECT&lt;/code&gt; query in incrementally updated materialized views.&lt;/p&gt;

&lt;p&gt;As in every structured streaming framework, to provide a valid analysis we can benefit from defining a set of assumptions for windowing and lateness of data (grace periods). Currently, Materialize makes use of &lt;code&gt;mz_logical_timestamp()&lt;/code&gt; function to define windowing and grace periods of data. The &lt;code&gt;mz_logical_timestamp()&lt;/code&gt; function represents the current timestamp in milliseconds at the time of the query execution.&lt;/p&gt;

&lt;p&gt;The questions require us to keep track of the last 20 minutes of data. Theoretically we need to analyze data in sliding windows. Following the nice explanation in Materialize documentation, using the &lt;code&gt;mz_logical_timestamp()&lt;/code&gt; we can compute the answering queries of our crypto analysis for the past 20 minutes.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="p"&gt;{{&lt;/span&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;materialized&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;'materializedview'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'crypto'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="o"&gt;#&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt; &lt;span class="n"&gt;mins&lt;/span&gt; &lt;span class="o"&gt;#&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="k"&gt;set&lt;/span&gt; &lt;span class="n"&gt;slide_threshold&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'1200000'&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="n"&gt;with_casted_insertion&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;

    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;extract&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;epoch&lt;/span&gt; &lt;span class="k"&gt;from&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;inserted_at&lt;/span&gt;
    &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="k"&gt;ref&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'stg_crypto'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;

&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;with_casted_insertion&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="k"&gt;TRUE&lt;/span&gt;
    &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;mz_logical_timestamp&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;inserted_at&lt;/span&gt;
    &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;mz_logical_timestamp&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;inserted_at&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="p"&gt;{{&lt;/span&gt; &lt;span class="n"&gt;slide_threshold&lt;/span&gt; &lt;span class="p"&gt;}}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the producer we did not provide an insertion date in milliseconds. By using SQL statements we can convert the field &lt;code&gt;timestamp&lt;/code&gt; into milliseconds and call this as &lt;code&gt;inserted_at&lt;/code&gt;. This new field can be our benchmark for windowing.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;mz_logical_timestamp()&lt;/code&gt; and its usage is quite complicated. I suggest reading &lt;a href="https://materialize.com/docs/sql/functions/now_and_mz_logical_timestamp/" rel="noopener noreferrer"&gt;the nice documentation page from Materialize&lt;/a&gt; to see other use cases and their explanations.&lt;/p&gt;

&lt;p&gt;On top of the materialized view created above we can write our analytics queries to expose in a dashboarding tool, such as Metabase. The analytics queries would use dbt functionalities as well, materialized as &lt;code&gt;materializedview&lt;/code&gt;. You can find the resulting queries &lt;a href="https://github.com/nazliander/mz-hack-day-2022/tree/main/sample_project/dbt/models/examples/marts/crypto" rel="noopener noreferrer"&gt;in the repository.&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Resulting Lineage and Last Words
&lt;/h2&gt;

&lt;p&gt;After obtaining the analytics queries, all we need to do is to run only once the &lt;code&gt;crypto&lt;/code&gt; tagged models from dbt:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;dbt run &lt;span class="nt"&gt;-m&lt;/span&gt; tag:crypto
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output from dbt will be as:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;19:01:19  Found 10 models, 0 tests, 0 snapshots, 0 analyses, 180 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
19:01:19
19:01:19  Concurrency: 1 threads (target='dev')
19:01:19
19:01:19  1 of 5 START source model public.rc_coins....................................... [RUN]
19:01:19  1 of 5 OK created source model public.rc_coins.................................. [CREATE SOURCE in 0.08s]
19:01:19  2 of 5 START view model public.stg_crypto....................................... [RUN]
19:01:19  2 of 5 OK created view model public.stg_crypto.................................. [CREATE VIEW in 0.05s]
19:01:19  3 of 5 START materializedview model public.fct_crypto_sliding_window............ [RUN]
19:01:19  3 of 5 OK created materializedview model public.fct_crypto_sliding_window....... [CREATE VIEW in 0.06s]
19:01:19  4 of 5 START materializedview model public.marketcap_changes.................... [RUN]
19:01:19  4 of 5 OK created materializedview model public.marketcap_changes............... [CREATE VIEW in 0.06s]
19:01:19  5 of 5 START materializedview model public.volatile_cryptos..................... [RUN]
19:01:20  5 of 5 OK created materializedview model public.volatile_cryptos................ [CREATE VIEW in 0.05s]
19:01:20
19:01:20  Finished running 1 source model, 1 view model, 3 materializedview models in 0.38s.
19:01:20
19:01:20  Completed successfully
19:01:20
19:01:20  Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The resulting dbt lineage diagram would be as follows:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8lnf21l5yy19ji9ea9fa.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8lnf21l5yy19ji9ea9fa.png" alt="dbt Lineage" width="800" height="197"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Real-time analytics is used in getting immediate insights for financial data, news sources, social media reactions and healthcare. In most of the cases SQL is enough for obtaining basic insights in such systems. Thus, SQL dbt in combination with Materialize improve the efficiency of developing real-time analytics pipelines. The setup that was provided in the Hackday was a great example of this.&lt;/p&gt;

&lt;p&gt;The biggest challenge after this point would be the maintenance of these pipelines. Luckily, there is sufficient information on integrating both RedPanda and Materialize with monitoring tools. As a next reading tip, I would nerdly suggest their documentation pages:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;a href="https://docs.redpanda.com/docs/cluster-management/monitoring/" rel="noopener noreferrer"&gt;RedPanda to Prometheus metrics&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://materialize.com/docs/ops/monitoring/" rel="noopener noreferrer"&gt;Materialize monitoring documentation page&lt;/a&gt;&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>database</category>
      <category>python</category>
      <category>datascience</category>
      <category>programming</category>
    </item>
    <item>
      <title>Using Google Cloud Platform Operators in Apache Airflow</title>
      <dc:creator>Nazli Ander</dc:creator>
      <pubDate>Wed, 25 Aug 2021 15:34:32 +0000</pubDate>
      <link>https://forem.com/nazliander/about-the-google-cloud-platform-operators-in-airflow-3ap7</link>
      <guid>https://forem.com/nazliander/about-the-google-cloud-platform-operators-in-airflow-3ap7</guid>
      <description>&lt;p&gt;Different Airflow operators create more possibilities while designing a scheduled workflow. Being aware of those enhances our way of dealing with real-world problems. *&lt;/p&gt;

&lt;p&gt;There are many Airflow operators that keep impressing me during my daily job. Recently, I played quite a bit with the GCP operators within Airflow. In this write-up, I would like to first provide an overview of the Airflow Cloud Providers (used to be called contributor operators before Airflow 2.0, &lt;a href="https://airflow.apache.org/docs/apache-airflow-providers/" rel="noopener noreferrer"&gt;see&lt;/a&gt;). Then I would like to share one of the example Directed Acyclic Graphs (DAG) workflow models, created for this toy project.&lt;/p&gt;

&lt;h2&gt;
  
  
  Summary of the Airflow Cloud Providers
&lt;/h2&gt;

&lt;p&gt;In the world of Airflow Google cloud providers package, which is also valid for Amazon Web Services (AWS), we have four main sub-groups of functions:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;1. Operators:&lt;/strong&gt; They are full-fledged operations, enabling us to execute read/create/delete/update/trigger tasks. Operators do not need to be running &lt;a href="https://en.wikipedia.org/wiki/Create,_read,_update_and_delete" rel="noopener noreferrer"&gt;CRUD&lt;/a&gt; operations on datasets. They can also invoke a GCP function. Some examples and their example use cases:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html#airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator" rel="noopener noreferrer"&gt;BigQueryDeleteTableOperator&lt;/a&gt;: In the end of a BigQuery to Google Cloud Storage (GCS) operation, we might need to delete the existing table.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/gcs/index.html?highlight=gcscreatebucketoperator#airflow.providers.google.cloud.operators.gcs.GCSCreateBucketOperator" rel="noopener noreferrer"&gt;GCSCreateBucketOperator&lt;/a&gt;: To store a set of dataset partitions for specific dates, we might need to create a bucket in the beginning of a DAG.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/functions/index.html?highlight=cloudfunctioninvokefunctionoperator#airflow.providers.google.cloud.operators.functions.CloudFunctionInvokeFunctionOperator" rel="noopener noreferrer"&gt;CloudFunctionInvokeFunctionOperator&lt;/a&gt;: In case we update a certain point of a Cloud Function, we might need to test it in the end of a DAG.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;2. Hooks:&lt;/strong&gt; They are flexible clients, enabling us to interact with cloud providers. Hooks are the underlying clients in the operators, we can create customised functions (Python callable) or operators with those. Some examples are listed below. All include the methods for CRUD operations in the services they mention in their name:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/bigquery/index.html?highlight=bigqueryhook#airflow.providers.google.cloud.hooks.bigquery.BigQueryHook" rel="noopener noreferrer"&gt;BigQueryHook&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/gcs/index.html?highlight=gcshook#airflow.providers.google.cloud.hooks.gcs.GCSHook" rel="noopener noreferrer"&gt;GCSHook&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/mlengine/index.html" rel="noopener noreferrer"&gt;MLEngineHook&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;3. Sensors:&lt;/strong&gt; They are simply checkers. They check if data exists in a certain location. They can be used mostly for checking if a certain operation is completed. Some examples and their example use cases:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/bigquery/index.html?highlight=bigquerytablepartitionexistencesensor#airflow.providers.google.cloud.sensors.bigquery.BigQueryTablePartitionExistenceSensor" rel="noopener noreferrer"&gt;BigQueryTablePartitionExistenceSensor&lt;/a&gt;: You might need to run a sequential task to process the latest partition right after you check that is created by another task.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/gcs/index.html?highlight=gcsobjectexistencesensor#airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor" rel="noopener noreferrer"&gt;GCSObjectExistenceSensor&lt;/a&gt;: You might need to run a sequential task to process an object within a certain bucket right after you check that is created by another task.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;4. Transfers:&lt;/strong&gt; They are simply data transport operations. They enable us to move data from one bucket to another one. They can also move data from one service to another, even if the service belongs to another cloud provider. Some examples are listed below:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/gcs_to_bigquery/index.html?highlight=gcstobigqueryoperator#airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator" rel="noopener noreferrer"&gt;GCSToBigQueryOperator&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/s3_to_gcs/index.html?highlight=s3togcsoperator#airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator" rel="noopener noreferrer"&gt;S3ToGCSOperator&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs/index.html?highlight=facebookadsreporttogcsoperator#airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.FacebookAdsReportToGcsOperator" rel="noopener noreferrer"&gt;FacebookAdsReportToGcsOperator&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;GCP operators in Airflow can be summarised as in the following chart:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxubz2hiu744xsj42v41e.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxubz2hiu744xsj42v41e.png" alt="Providers sub-groups of Airflow functionalities"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We need a GCP connection (id) on Airflow, to have a functioning setup for Google Cloud operations on Airflow. The GCP connection can be set via configurations (some DevOps effort), or it can be set through the Airflow Web UI. It is explained &lt;a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/connections/gcp.html" rel="noopener noreferrer"&gt;here&lt;/a&gt;. Each of the GCP task that we create, to enable authorisation, we need to refer to the GCP connection id.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Example GCP DAG
&lt;/h2&gt;

&lt;p&gt;The example DAG is shown in the following chart. With this DAG, I aim to load a partitioned table into the Google Cloud Storage (GCS), then compose the multiple files generated by the previous process.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fi5158i17a4enidlh2xat.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fi5158i17a4enidlh2xat.png" alt="Example DAG"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Each task within this DAG can be considered as a kind of GCP configuration. They are explained as follows:&lt;/p&gt;

&lt;h3&gt;
  
  
  1. BigQuery to GCS:
&lt;/h3&gt;

&lt;p&gt;The task retrieves a partitioned table from BigQuery and exports into a GCS bucket. For each partition we create a separate gzipped text file (&lt;code&gt;compression&lt;/code&gt;, &lt;code&gt;export_format&lt;/code&gt;). Just to create a set of assumptions, I do not want to have the header row (&lt;code&gt;print_header&lt;/code&gt;), and I use comma (&lt;code&gt;,&lt;/code&gt;) as text file delimiter (&lt;code&gt;field_delimiter&lt;/code&gt;). The partitions are created with the wildcard (&lt;code&gt;*&lt;/code&gt;) character in the end of the file name (&lt;code&gt;destination_cloud_storage_uris&lt;/code&gt;).&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.models&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DAG&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.providers.google.cloud.transfers.bigquery_to_gcs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;BigQueryToGCSOperator&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.utils.dates&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;days_ago&lt;/span&gt;

&lt;span class="c1"&gt;# Define configuration global variables
&lt;/span&gt;
&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcp_dag&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;days_ago&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;example&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

    &lt;span class="n"&gt;bigquery_to_gcs&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BigQueryToGCSOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;gcp_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcp_connection_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;bigquery_to_gcs&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;compression&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;GZIP&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;export_format&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;CSV&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;field_delimiter&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;print_header&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;source_project_dataset_table&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;DATASET_NAME&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;TABLE&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;destination_cloud_storage_uris&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;
            &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gs://&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;DATA_EXPORT_BUCKET_NAME&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;EXPECTED_FILE_NAME&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-*.csv.gz&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Define other operations
&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  2. GCS Compose:
&lt;/h3&gt;

&lt;p&gt;GCP operators do not provide a full-fledged solution when it comes to the problem of composing multiple text files into one gzip file. Thus we need to create our own customised functions (Python callable) or operators (see &lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html" rel="noopener noreferrer"&gt;this documentation&lt;/a&gt;).&lt;/p&gt;

&lt;p&gt;The Python Operator, in my example DAG, calls a mysterious function (&lt;code&gt;compose_files_into_one&lt;/code&gt;) with &lt;code&gt;bucket_name&lt;/code&gt;, &lt;code&gt;source_object_prefix&lt;/code&gt;, &lt;code&gt;destination_object&lt;/code&gt;, &lt;code&gt;gcp_conn_id&lt;/code&gt; parameters. Those parameters are provided as keys in the Python Operator's &lt;code&gt;op_kwargs&lt;/code&gt; argument.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.models&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DAG&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators.python&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;PythonOperator&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.utils.dates&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;days_ago&lt;/span&gt;

&lt;span class="c1"&gt;# Define configuration global variables
&lt;/span&gt;
&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcp_dag&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;days_ago&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;example&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

    &lt;span class="c1"&gt;# Previous operations
&lt;/span&gt;
    &lt;span class="n"&gt;compose_files&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcs_compose&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;compose_files_into_one&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;op_kwargs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;bucket_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;DATA_EXPORT_BUCKET_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;source_object_prefix&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;EXPECTED_FILE_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;destination_object&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;EXPECTED_FILE_NAME&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.csv.gz&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcp_conn_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcp_connection_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
            &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Any other operations
&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;As the Python Operator calls &lt;code&gt;compose_files_into_one&lt;/code&gt;, all the magic happens there. &lt;code&gt;compose_files_into_one&lt;/code&gt; is a function contains all the hook logic. It uses the &lt;code&gt;GCSHook&lt;/code&gt; as a client to list all the objects with the given prefix. Then it composes the partition files into one gzip file.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.providers.google.cloud.hooks.gcs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;GCSHook&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;compose_files_into_one&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                           &lt;span class="n"&gt;source_object_prefix&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                           &lt;span class="n"&gt;destination_object&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                           &lt;span class="n"&gt;gcp_conn_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;'''&lt;/span&gt;&lt;span class="s"&gt;Composes wildcarded files into one in the given destination&lt;/span&gt;&lt;span class="sh"&gt;'''&lt;/span&gt;
    &lt;span class="n"&gt;gcs_hook&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;GCSHook&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;gcp_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;gcp_conn_id&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;list_of_objects&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;gcs_hook&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;source_object_prefix&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;gcs_hook&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;compose&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;source_objects&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;list_of_objects&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;destination_object&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;destination_object&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The compose operation can be summarised with the following chart:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcrfh6qp6nd5a2il1ka4q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcrfh6qp6nd5a2il1ka4q.png" alt="Compose operation"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  3. Delete Redundant Objects:
&lt;/h3&gt;

&lt;p&gt;In the end of the compose task, I have the partitioned datasets as well as the already composed gzip file in the GCS bucket, so we had the same data twice as in multiple files and in a compact gzip file.&lt;/p&gt;

&lt;p&gt;As a conscientious developer I want to delete the remaining partitions from the bucket and keep only the compact gzip file. Thus, I use GCS Delete Objects Operator to create the last task of my example DAG.&lt;/p&gt;

&lt;p&gt;GCS Delete Objects Operator does not require much of a configuration. It needs a &lt;code&gt;bucket_name&lt;/code&gt;, a &lt;code&gt;gcp_conn_id&lt;/code&gt;, and a &lt;code&gt;prefix&lt;/code&gt; parameter. With the &lt;code&gt;prefix&lt;/code&gt; parameter, we can filter out the objects to be deleted. So, if the partitions start with &lt;code&gt;&amp;lt;defined_text_file_name&amp;gt;-&amp;lt;partition_number&amp;gt;&lt;/code&gt;, then we can use &lt;code&gt;&amp;lt;defined_text_file_name&amp;gt;-&lt;/code&gt; to filter out all the partitions to be deleted.&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;

&lt;p&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.models&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DAG&lt;/span&gt;&lt;br&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.providers.google.cloud.operators.gcs&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;GCSDeleteObjectsOperator&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;br&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.utils.dates&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;days_ago&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;br&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcp_dag&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;start_date&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nf"&gt;days_ago&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;example&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;br&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;span class="c1"&amp;gt;# Previous operations
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;br&gt;
    &lt;span class="n"&gt;delete_combined_objects&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;GCSDeleteObjectsOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;br&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcs_combined_files_delete&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;br&gt;
        &lt;span class="n"&gt;gcp_conn_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;gcp_connection_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;br&gt;
        &lt;span class="n"&gt;bucket_name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;DATA_EXPORT_BUCKET_NAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;br&gt;
        &lt;span class="n"&gt;prefix&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;EXPECTED_FILE_NAME&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;-&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;br&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;span class="c1"&amp;gt;# DAG definition (dependency)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Last Words&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;GCP operators in Airflow are quite extendable and lightweight, and they require a small amount of configuration. Most of the operators are well-fitting for the use cases that I am able to think of. This write-up is a result of my appreciation of this nicely evolved providers package of Airflow. With every release of the providers' packages, yet another shortcut is added in the lives of data programmers. This is another good example of how being aware of those shortcuts can make our workflows more reliable and maintainable.&lt;/p&gt;

&lt;p&gt;To have a look at the whole example DAG, with its Docker-compose setup, you can refer to &lt;a href="https://github.com/nazliander/gcp-airflow-basic-operations" rel="noopener noreferrer"&gt;its Github repository&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;(*)&lt;a&gt;&lt;/a&gt; This reminds me a lot of the godly boons I got when playing &lt;a href="https://hades.fandom.com/wiki/Hades_(game)" rel="noopener noreferrer"&gt;Hades&lt;/a&gt;, which are limited upgrades acquired by the protagonist, Zagreus. Gods randomly provide upgrades (boons) in Zagreus' (the protagonist of the game) journeys. Those upgrades make Zagreus' life easier for some time, exactly like the Google Cloud Platform (GCP) operators in Airflow being boons that make our lives easier for a while.&lt;/p&gt;

</description>
      <category>googlecloud</category>
      <category>devops</category>
      <category>python</category>
      <category>database</category>
    </item>
    <item>
      <title>Using Pydantic as a Parsing and Data Validation Tool</title>
      <dc:creator>Nazli Ander</dc:creator>
      <pubDate>Wed, 16 Dec 2020 20:17:42 +0000</pubDate>
      <link>https://forem.com/nazliander/using-pydantic-as-a-parser-and-data-validation-tool-51n3</link>
      <guid>https://forem.com/nazliander/using-pydantic-as-a-parser-and-data-validation-tool-51n3</guid>
      <description>&lt;p&gt;&lt;a href="https://pydantic-docs.helpmanual.io/" rel="noopener noreferrer"&gt;Pydantic&lt;/a&gt; provides a &lt;code&gt;BaseModel&lt;/code&gt;, which can be extended into different fields of collections for data modeling. It has support for &lt;code&gt;Enum&lt;/code&gt; type, JSON conversion configurations, and even HTTP string parsing.&lt;/p&gt;

&lt;p&gt;Of course, we need reasons to use all those nice functionalities. Hence, as every home-made-project has its storylines, I created a database use case related to NASA's APIs. First I will briefly explain the use case, followed by the concepts that I experimented with in my toy-project. &lt;/p&gt;

&lt;h3&gt;
  
  
  Introduction
&lt;/h3&gt;

&lt;p&gt;NASA has a bunch of cool and &lt;a href="https://api.nasa.gov/" rel="noopener noreferrer"&gt;publicly available APIs&lt;/a&gt;. All we need to do as developers to use them is to request an API KEY. Then we can request data related to the latest innovations that NASA has, or pictures of the day, or the weather notifications. &lt;/p&gt;

&lt;p&gt;For this application, I chose the weather notifications (DONKI). Because it has some text and &lt;code&gt;datetime&lt;/code&gt; fields that I could make use of in this parsing case.&lt;/p&gt;

&lt;p&gt;With the requested weather notification data, I wanted to parse them nicely with Pydantic, then insert them into a document database (MongoDB).&lt;/p&gt;

&lt;p&gt;A raw response looks like the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "messageType":"RBE",
    "messageID":"20201007-AL-001",
    "messageURL":"https://kauai.ccmc.gsfc.nasa.gov/DONKI/view/Alert/15920/1",
    "messageIssueTime":"2020-10-07T17:18Z",
    "messageBody":"## NASA Goddard Space Flight Center, Space Weather Research Center ( SWRC )\n## Message Type: Space Weather Notification - Radiation Belt Enhancement\n##\n## Message Issue Date: 2020-10-07T17:18:51Z\n## Message ID: 20201007-AL-001\n##\n## Disclaimer: NOAA's Space Weather Prediction Center (http://swpc.noaa.gov) is the United States Government official source for space weather forecasts. This \"Experimental Research Information\" consists of preliminary NASA research products and should be interpreted and used accordingly.\n\n\n## Summary:\n\nSignificantly elevated energetic electron fluxes in the Earth's outer radiation belt. GOES \"greater than 2.0 MeV\" integral electron flux is above 1000 pfu starting at 2020-10-07T14:05Z. \n\nThe elevated energetic electron flux levels are caused by an S-type CME with ID 2020-09-30T12:09:00-CME-001.\n\nNASA spacecraft at GEO, MEO and other orbits passing through or in the vicinity of the Earth's outer radiation belt can be impacted.\n\nActivity ID: 2020-10-07T14:05:00-RBE-001."
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The main goal of the toy-project is to take this raw data and insert it into a MongoDB while having all the mandatory fields with correct data formats. An example record is expected to look like below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{ 
    "_id" : ObjectId("5fd917b83ecc12560ee43ef1"),
    "insertion_date" : ISODate("2020-12-15T20:08:23.091Z"),
    "message_type_abbreviation" : "RBE",
    "message_url" : "https://kauai.ccmc.gsfc.nasa.gov/DONKI/view/Alert/15920/1",
    "message_body" : 
        {
            "message_type" : "Space Weather Notification - Radiation Belt Enhancement",
            "message_issue_date" : ISODate("2020-10-07T17:18:51Z"),
            "message_id" : "20201007-AL-001",
            "disclaimer" : "NOAA's Space Weather Prediction Center (http://swpc.noaa.gov) is the United States Government official source for space weather forecasts. This \"Experimental Research Information\" consists of preliminary NASA research products and should be interpreted and used accordingly.",
            "summary" : "Significantly elevated energetic electron fluxes in the Earth's outer radiation belt. GOES \"greater than 2.0 MeV\" integral electron flux is above 1000 pfu starting at 2020-10-07T14:05Z. The elevated energetic electron flux levels are caused by an S-type CME with ID 2020-09-30T12:09:00-CME-001. NASA spacecraft at GEO, MEO and other orbits passing through or in the vicinity of the Earth's outer radiation belt can be impacted. Activity ID: 2020-10-07T14:05:00-RBE-001.",
            "notes" : null
        }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Some Remarks About the Dataset:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Message Type can be only one of the following categories: FLR, SEP, CME, IPS, MPC, GST, RBE, and Report.&lt;/li&gt;
&lt;li&gt;Message Body has text fields separated with the following characters: &lt;code&gt;\n##&lt;/code&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Some Nice Concepts From Pydantic
&lt;/h3&gt;

&lt;p&gt;Pydantic comes with a &lt;code&gt;BaseModel&lt;/code&gt;. With the &lt;code&gt;BaseModel&lt;/code&gt;, we can parse some dictionaries with the correct typing, or change the behavior of a certain type while transforming the &lt;code&gt;BaseModel&lt;/code&gt; into JSON.&lt;/p&gt;

&lt;p&gt;Perhaps while dealing with nested forms of data, thinking from inside towards outside makes our lives easier. Hence, starting with the Message Body might be useful for this use case. The following example can be used for modeling the Message Body. For typing we can get help from the &lt;a href="https://docs.python.org/3/library/typing.html" rel="noopener noreferrer"&gt;typing&lt;/a&gt; module, as an example to have optional fields:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;typing&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pydantic&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BaseModel&lt;/span&gt;


&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;NotificationMessageBody&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseModel&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;message_type&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;message_issue_date&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;
    &lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;
    &lt;span class="n"&gt;disclaimer&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;summary&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="n"&gt;notes&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;Optional&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can transform a &lt;code&gt;NotificationMessageBody&lt;/code&gt; type into a JSON with the following code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;parsed_and_cleaned_message_body_dict&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Space Weather Notification - Radiation Belt Enhancement&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_issue_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2020-10-07T17:18:51Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;20201007-AL-001&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;disclaimer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;NOAA&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s Space Weather Prediction Center (http://swpc.noaa.gov) is the United States Government official source for space weather forecasts. This &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt;Experimental Research Information&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt; consists of preliminary NASA research products and should be interpreted and used accordingly.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;summary&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Significantly elevated energetic electron fluxes in the Earth&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s outer radiation belt. GOES &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt;greater than 2.0 MeV&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt; integral electron flux is above 1000 pfu starting at 2020-10-07T14:05Z. The elevated energetic electron flux levels are caused by an S-type CME with ID 2020-09-30T12:09:00-CME-001. NASA spacecraft at GEO, MEO and other orbits passing through or in the vicinity of the Earth&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s outer radiation belt can be impacted. Activity ID: 2020-10-07T14:05:00-RBE-001.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="nc"&gt;NotificationMessageBody&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;parsed_and_cleaned_message_body_dict&lt;/span&gt;
    &lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will print all the fields in a default format. The datetime field will be first transformed from string to Python datetime, then the default JSON transformation will output the &lt;a href="https://en.wikipedia.org/wiki/ISO_8601" rel="noopener noreferrer"&gt;ISO 8601&lt;/a&gt; datetime string format. The &lt;code&gt;notes&lt;/code&gt; field does not have an input in the example. For that reason the JSON transformation will ensure that it returns to &lt;code&gt;null&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "message_type": "Space Weather Notification - Radiation Belt Enhancement",
    "message_issue_date": "2020-10-07T17:18:51+00:00",
    "message_id": "20201007-AL-001",
    "disclaimer": "NOAA's Space Weather Prediction Center (http://swpc.noaa.gov) is the United States Government official source for space weather forecasts. This \"Experimental Research Information\" consists of preliminary NASA research products and should be interpreted and used accordingly.",
    "summary": "Significantly elevated energetic electron fluxes in the Earth's outer radiation belt. GOES \"greater than 2.0 MeV\" integral electron flux is above 1000 pfu starting at 2020-10-07T14:05Z. The elevated energetic electron flux levels are caused by an S-type CME with ID 2020-09-30T12:09:00-CME-001. NASA spacecraft at GEO, MEO and other orbits passing through or in the vicinity of the Earth's outer radiation belt can be impacted. Activity ID: 2020-10-07T14:05:00-RBE-001.",
    "notes": null
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Lastly, we can model the whole &lt;code&gt;NotificationMessage&lt;/code&gt;. We may choose to use an Enum to check if one of those Enum values within the &lt;code&gt;MessageTypeAbbreviationEnum&lt;/code&gt; exists. Also, we might choose to use &lt;code&gt;HttpUrl&lt;/code&gt; type from Pydantic. This assures that the URL string contains &lt;code&gt;HTTP&lt;/code&gt; or &lt;code&gt;HTTPS&lt;/code&gt; protocol. Besides, it breaks the URL into the pieces of &lt;a href="https://en.wikipedia.org/wiki/URL#Syntax" rel="noopener noreferrer"&gt;scheme&lt;/a&gt;, &lt;a href="https://en.wikipedia.org/wiki/Host_(network)" rel="noopener noreferrer"&gt;host&lt;/a&gt;, &lt;a href="https://en.wikipedia.org/wiki/Top-level_domain" rel="noopener noreferrer"&gt;tld&lt;/a&gt;, &lt;code&gt;host_type&lt;/code&gt;, and &lt;a href="https://en.wikipedia.org/wiki/URL#Syntax" rel="noopener noreferrer"&gt;path&lt;/a&gt; fields:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pydantic&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BaseModel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;HttpUrl&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;enum&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Enum&lt;/span&gt;


&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MessageTypeAbbreviationEnum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Enum&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;FLR&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;FLR&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;SEP&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SEP&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;CME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;CME&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;IPS&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;IPS&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;MPC&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;MPC&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;GST&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;GST&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;RBE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;RBE&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="n"&gt;Report&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Report&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;


&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;NotificationMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BaseModel&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;insertion_date&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;
    &lt;span class="n"&gt;message_type_abbreviation&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;MessageTypeAbbreviationEnum&lt;/span&gt;
    &lt;span class="n"&gt;message_url&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;HttpUrl&lt;/span&gt;
    &lt;span class="n"&gt;message_body&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;NotificationMessageBody&lt;/span&gt;

    &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Config&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;json_encoders&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;v&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;v&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;%Y-%m-%d %H:%M:%S&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The example contains an additional configuration for the JSON encoders. This time different from the &lt;code&gt;NotificationMessageBody&lt;/code&gt; example, the &lt;code&gt;json_encoders&lt;/code&gt; ensures that the datetime is formatted as &lt;code&gt;%Y-%m-%d %H:%M:%S&lt;/code&gt; as the JSON transformation is being done.&lt;/p&gt;

&lt;p&gt;An example input:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;parsed_and_cleaned_message_body_dict&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_type&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Space Weather Notification - Radiation Belt Enhancement&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_issue_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2020-10-07T17:18:51Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;20201007-AL-001&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;disclaimer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;NOAA&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s Space Weather Prediction Center (http://swpc.noaa.gov) is the United States Government official source for space weather forecasts. This &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt;Experimental Research Information&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt; consists of preliminary NASA research products and should be interpreted and used accordingly.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;summary&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Significantly elevated energetic electron fluxes in the Earth&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s outer radiation belt. GOES &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt;greater than 2.0 MeV&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s"&gt; integral electron flux is above 1000 pfu starting at 2020-10-07T14:05Z. The elevated energetic electron flux levels are caused by an S-type CME with ID 2020-09-30T12:09:00-CME-001. NASA spacecraft at GEO, MEO and other orbits passing through or in the vicinity of the Earth&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;s outer radiation belt can be impacted. Activity ID: 2020-10-07T14:05:00-RBE-001.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;parsed_and_cleaned_notification_message_dict&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;insertion_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2020-12-15T20:08:23.091Z&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_type_abbreviation&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;RBE&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_url&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;https://kauai.ccmc.gsfc.nasa.gov/DONKI/view/Alert/15920/1&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message_body&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;parsed_and_cleaned_message_body_dict&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="nc"&gt;NotificationMessage&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;parsed_and_cleaned_notification_message_dict&lt;/span&gt;
    &lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The example gives the following output. The datetime fields (&lt;code&gt;insertion_date&lt;/code&gt; and &lt;code&gt;message_issue_date&lt;/code&gt;) contain the given datetime format (&lt;code&gt;%Y-%m-%d %H:%M:%S&lt;/code&gt;) in the configuration class (&lt;code&gt;2020-12-15T20:08:23.091Z&lt;/code&gt; becomes &lt;code&gt;2020-12-15 20:08:23&lt;/code&gt;). And all the &lt;code&gt;message_body&lt;/code&gt; fields are parsed as the &lt;code&gt;NotificationMessageBody&lt;/code&gt; model suggests:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "insertion_date": "2020-12-15 20:08:23",
    "message_type_abbreviation": "RBE",
    "message_url": "https://kauai.ccmc.gsfc.nasa.gov/DONKI/view/Alert/15920/1",
    "message_body": 
        {
            "message_type": "Space Weather Notification - Radiation Belt Enhancement",
            "message_issue_date": "2020-10-07 17:18:51",
            "message_id": "20201007-AL-001",
            "disclaimer": "NOAA's Space Weather Prediction Center (http://swpc.noaa.gov) is the United States Government official source for space weather forecasts. This \"Experimental Research Information\" consists of preliminary NASA research products and should be interpreted and used accordingly.",
            "summary": "Significantly elevated energetic electron fluxes in the Earth's outer radiation belt. GOES \"greater than 2.0 MeV\" integral electron flux is above 1000 pfu starting at 2020-10-07T14:05Z. The elevated energetic electron flux levels are caused by an S-type CME with ID 2020-09-30T12:09:00-CME-001. NASA spacecraft at GEO, MEO and other orbits passing through or in the vicinity of the Earth's outer radiation belt can be impacted. Activity ID: 2020-10-07T14:05:00-RBE-001.",
            "notes": null
        }
}      
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Insertion into the DB
&lt;/h3&gt;

&lt;p&gt;After parsing and validating the nested dictionaries to be compliant with a &lt;br&gt;
&lt;code&gt;NotificationMessage&lt;/code&gt; model, one can insert those many notifications into a database. This might be a document database, as they have a similar structure as JSON. The project uses MongoDB.&lt;/p&gt;

&lt;p&gt;The popular package for MongoDB, &lt;a href="https://pymongo.readthedocs.io/en/stable/tutorial.html" rel="noopener noreferrer"&gt;pymongo&lt;/a&gt;, has a nice method for inserting many records at one batch. Surprisingly this is called as &lt;code&gt;insert_many&lt;/code&gt;. And it requires dictionary type. Just to mention, I used the dictionary transformation method from Pydantic for this purpose:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;notifications&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;donki_parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;create_message_dictionary&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;notifications_as_dict&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;map&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;lambda&lt;/span&gt; &lt;span class="n"&gt;n&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;n&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dict&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;notifications&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;span class="n"&gt;notifications_repository&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;NotificationsRepository&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;MONGO_HOST&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;MONGO_PORT&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;notifications_repository&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;insert_many&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;notifications_as_dict&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Last Words
&lt;/h3&gt;

&lt;p&gt;It is enjoyable to learn more about data parsing and validating in Python. Pydantic is a handy tool for this purpose. The examples that I gave require quite a bit of preprocessing. Perhaps that is because of the example data API that I have chosen. A large number of text fields in the message body make the data hard to parse. After solving this puzzle, Pydantic made sure that all fields are correct and ready to insert into a document database.&lt;/p&gt;

&lt;p&gt;You can check more &lt;a href="https://pydantic-docs.helpmanual.io/" rel="noopener noreferrer"&gt;in the Pydantic documentation&lt;/a&gt;, about the functionalities that they provide.&lt;/p&gt;

&lt;p&gt;For the whole project, please refer to &lt;a href="https://github.com/nazliander/pydantic-parsing-example" rel="noopener noreferrer"&gt;the Github repository.&lt;/a&gt;&lt;/p&gt;

</description>
      <category>python</category>
      <category>programming</category>
      <category>database</category>
      <category>mongodb</category>
    </item>
    <item>
      <title>Creating a Development Environment for Spark Structured Streaming, Kafka, and Prometheus</title>
      <dc:creator>Nazli Ander</dc:creator>
      <pubDate>Sun, 04 Oct 2020 19:53:22 +0000</pubDate>
      <link>https://forem.com/nazliander/creating-a-development-environment-for-spark-structured-streaming-kafka-and-prometheus-29dl</link>
      <guid>https://forem.com/nazliander/creating-a-development-environment-for-spark-structured-streaming-kafka-and-prometheus-29dl</guid>
      <description>&lt;p&gt;Docker-compose allows us to simulate pretty complex programming setups in our local environments. It is very fun to test some hard-to-maintain technologies such as Kafka and Spark using Docker-compose.&lt;/p&gt;

&lt;p&gt;A few months ago, I created a demo application while using &lt;a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html" rel="noopener noreferrer"&gt;Spark Structured Streaming&lt;/a&gt;, &lt;a href="https://kafka.apache.org/" rel="noopener noreferrer"&gt;Kafka&lt;/a&gt;, and &lt;a href="https://prometheus.io/" rel="noopener noreferrer"&gt;Prometheus&lt;/a&gt; within the same Docker-compose file. One can extend this list with an additional &lt;a href="https://grafana.com/" rel="noopener noreferrer"&gt;Grafana&lt;/a&gt; service. The codebase was in Python and I was ingesting live Crypto-currency prices into Kafka and consuming those through Spark Structured Streaming. In this write-up instead of talking about &lt;a href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking" rel="noopener noreferrer"&gt;the Watermarks&lt;/a&gt; and Sinking types in Spark Structured Streaming, I will be only talking about the Docker-compose and how I set up my development environment using Spark, Kafka, Prometheus, and a Zookeeper. To have the whole codebase for my demo project, please refer &lt;a href="https://github.com/nazliander/crypto-currency-streaming-prices" rel="noopener noreferrer"&gt;to the Github repository&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Service Blocks
&lt;/h2&gt;

&lt;p&gt;In the Docker-compose, I needed the following services to keep my streaming data producer and consumer live, at the same time monitor the ingestions into Kafka:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Spark standalone cluster:&lt;/strong&gt; Consisting of one master and a worker code

&lt;ul&gt;
&lt;li&gt;Spark-master&lt;/li&gt;
&lt;li&gt;Spark-worker&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;

&lt;strong&gt;Zookeeper:&lt;/strong&gt; A requirement for Kafka (&lt;a href="https://www.confluent.io/blog/removing-zookeeper-dependency-in-kafka/" rel="noopener noreferrer"&gt;soon it will not be a requirement&lt;/a&gt;) to maintain the brokers and topics. For instance, if a broker joins or dies, Zookeeper informs the cluster.&lt;/li&gt;

&lt;li&gt;

&lt;strong&gt;Kafka:&lt;/strong&gt; A Message-oriented Middleware (MoM) for dealing with large streams of data. In this case, we have streams of crypto-currency prices.&lt;/li&gt;

&lt;li&gt;

&lt;strong&gt;Prometheus-JMX-Exporter:&lt;/strong&gt; An exporter to connect Java Management Extensions (JMX) and translate into the language that Prometheus can understand. Remembering the Kafka is an example of a Java application, this will be a magic service that enables us to scrape Kafka metrics automatically.&lt;/li&gt;

&lt;li&gt;

&lt;strong&gt;Prometheus:&lt;/strong&gt; Time-series database logging and modern alerting tool.&lt;/li&gt;

&lt;/ul&gt;

&lt;h3&gt;
  
  
  Spark Services
&lt;/h3&gt;

&lt;p&gt;In the most basic setup for the standalone Spark cluster, we need one master and one worker node. You can use Docker-compose volumes for mounting folders. For Spark, perhaps the most common mounting reason is sharing the connectors (&lt;code&gt;.jar&lt;/code&gt; files) or scripts.&lt;/p&gt;

&lt;p&gt;For retrieving a Spark image from Docker Hub, as &lt;a href="https://www.big-data-europe.eu/" rel="noopener noreferrer"&gt;Big Data Europe&lt;/a&gt; has a very stable and extensive set of Spark Hadoop images, I preferred to use their images in my demo project. This prevented also some redundant work, like creating multiple Dockerfiles per Spark node.&lt;/p&gt;

&lt;p&gt;I needed to take care of the Networking within the Docker-compose settings. Hence, I created a Bridge network with a custom naming as "crypto-network". The Bridge network enables us to run our standalone containers while communicating with each other. For more information about different network drivers in Docker containers, please refer &lt;a href="https://docs.docker.com/network/" rel="noopener noreferrer"&gt;to Docker documentation,&lt;/a&gt; very fun to read. While setting up I tried to give different forwarded host ports rather than using 8080 for the Web UI to prevent conflicts with JMX-Exporter. Besides, I wanted the worker nodes to be dependent on the master node to set up the order of container creations.&lt;/p&gt;

&lt;p&gt;Lastly, following the BDE example, I override the &lt;code&gt;SPARK_MASTER&lt;/code&gt; with environment variables. Here I am sharing the Spark component of the demo application.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="nn"&gt;---&lt;/span&gt;
&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;3.2"&lt;/span&gt;
&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;

  &lt;span class="na"&gt;spark-master&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;bde2020/spark-master:2.2.2-hadoop2.7&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;spark-master&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./connectors:/connectors&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./:/scripts/&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;8082:8080&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;7077:7077&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;INIT_DAEMON_STEP=false&lt;/span&gt;

  &lt;span class="na"&gt;spark-worker-1&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;bde2020/spark-worker:2.2.2-hadoop2.7&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;spark-worker-1&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;spark-master&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;8083:8081&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SPARK_MASTER=spark://spark-master:7077"&lt;/span&gt;


&lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;crypto-network&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;driver&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bridge"&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can start the services with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker-compose up
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then you can see the Spark master node setup with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; spark-master bash
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Kafka Services
&lt;/h3&gt;

&lt;p&gt;To run Kafka in a standalone mode, I needed Zookeeper and Kafka itself with some fancy environment variables. Basically, Kafka needs to find the Zookeeper client port and it needs to advertise the correct ports to Spark applications.&lt;/p&gt;

&lt;p&gt;To run this setting I used the &lt;a href="https://docs.confluent.io/current/installation/docker/installation/index.html" rel="noopener noreferrer"&gt;Confluent&lt;/a&gt; images. Here, I am sharing the Kafka related services block. A Confluent image already allows us to set up:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Kafka topics by using the environment variables:

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_CREATE_TOPICS&lt;/code&gt;: Topic names to be created&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_AUTO_CREATE_TOPICS_ENABLE&lt;/code&gt;: Self-explaining perhaps&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR&lt;/code&gt;: Self-explaining perhaps&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;Connection to Zookeeper using the environment variable &lt;code&gt;KAFKA_ZOOKEEPER_CONNECT&lt;/code&gt;
&lt;/li&gt;

&lt;li&gt;With &lt;code&gt;KAFKA_BROKER_ID&lt;/code&gt; giving a custom broker id for a particular node&lt;/li&gt;

&lt;li&gt;Advertising the correct ports for the docker network internal services or external connections:

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_INTER_BROKER_LISTENER_NAME&lt;/code&gt;: Listener name for the setup&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_LISTENER_SECURITY_PROTOCOL_MAP&lt;/code&gt;: Listener setup with mapping&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_ADVERTISED_LISTENERS&lt;/code&gt;: Listener setup for internal and external networking. This is a bit tricky, so if I consume or produce any message in the internal Docker network, with the example below I need to connect to &lt;code&gt;kafka:29092&lt;/code&gt;. From outside of Docker, I can use a consumer or producer via &lt;code&gt;localhost:9092&lt;/code&gt;. For more information, &lt;a href="https://docs.confluent.io/current/kafka/multi-node.html" rel="noopener noreferrer"&gt;here is&lt;/a&gt; an awesome explanation.
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="nn"&gt;---&lt;/span&gt;
&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;3.2"&lt;/span&gt;
&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;zookeeper&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;confluentinc/cp-zookeeper&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;zookeeper&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;ZOOKEEPER_CLIENT_PORT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;2181&lt;/span&gt;
      &lt;span class="na"&gt;ZOOKEEPER_TICK_TIME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;2000&lt;/span&gt;

  &lt;span class="na"&gt;kafka&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;confluentinc/cp-kafka&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;zookeeper&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;9092:9092&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;30001:30001&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_CREATE_TOPICS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;crypto_raw,crypto_latest_trends,crypto_moving_average&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_BROKER_ID&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_ZOOKEEPER_CONNECT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;zookeeper:2181&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_LISTENER_SECURITY_PROTOCOL_MAP&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_INTER_BROKER_LISTENER_NAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_ADVERTISED_LISTENERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_AUTO_CREATE_TOPICS_ENABLE&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true"&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;100&lt;/span&gt;

&lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;crypto-network&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;driver&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bridge"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Prometheus Services
&lt;/h3&gt;

&lt;p&gt;In this project, I wanted to scrape Kafka's logs automatically. Hence, apart from the Prometheus service itself, I needed to also use the JMX-Exporter. And I realized that it is the coolest kid in a Docker-compose.&lt;/p&gt;

&lt;p&gt;For both Prometheus and it JMX-Exporter, I needed to use custom Dockerfiles as they require some templates to be aware of each other. I used a separate &lt;code&gt;./tools/&lt;/code&gt; folder to keep my monitoring related settings. And within the &lt;code&gt;./tools/prometheus-jmx-exporter&lt;/code&gt;, I had a &lt;code&gt;confd&lt;/code&gt; folder to make use of and configure Docker containers &lt;a href="http://www.mricho.com/confd-and-docker-separating-config-and-code-for-containers/" rel="noopener noreferrer"&gt;at run-time.&lt;/a&gt; Here the file structure is as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;.
├── prometheus
│   ├── Dockerfile
│   └── prometheus.yml
└── prometheus-jmx-exporter
    ├── Dockerfile
    ├── confd
    │   ├── conf.d
    │   │   ├── kafka.yml.toml
    │   │   └── start-jmx-scraper.sh.toml
    │   └── templates
    │       ├── kafka.yml.tmpl
    │       └── start-jmx-scraper.sh.tmpl
    └── entrypoint.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's start with the Prometheus image as it is more straightforward. We need to use a custom Dockerfile to get the config with custom scraper settings.&lt;/p&gt;

&lt;p&gt;The Dockerfile will be:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="k"&gt;FROM&lt;/span&gt;&lt;span class="s"&gt; prom/prometheus:v2.8.1&lt;/span&gt;

&lt;span class="k"&gt;ADD&lt;/span&gt;&lt;span class="s"&gt; ./prometheus.yml /etc/prometheus/prometheus.yml&lt;/span&gt;

&lt;span class="k"&gt;CMD&lt;/span&gt;&lt;span class="s"&gt; [ "--config.file=/etc/prometheus/prometheus.yml","--web.enable-admin-api" ]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And the &lt;code&gt;prometheus.yml&lt;/code&gt; would be pointing the following, with a scrape interval of 5 seconds. In &lt;code&gt;prometheus.yml&lt;/code&gt;, Prometheus targets a service called &lt;code&gt;kafka-jmx-exporter&lt;/code&gt; with port &lt;code&gt;8080&lt;/code&gt;. Hence, in the Docker-compose, I should be using the same container name for JMX-Exporter as the targeted service.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight toml"&gt;&lt;code&gt;&lt;span class="err"&gt;global:&lt;/span&gt;
  &lt;span class="err"&gt;scrape_interval:&lt;/span&gt;     &lt;span class="err"&gt;5s&lt;/span&gt;
  &lt;span class="err"&gt;evaluation_interval:&lt;/span&gt; &lt;span class="err"&gt;5s&lt;/span&gt;

&lt;span class="err"&gt;scrape_configs:&lt;/span&gt;
  &lt;span class="err"&gt;-&lt;/span&gt; &lt;span class="err"&gt;job_name:&lt;/span&gt; &lt;span class="err"&gt;'kafka'&lt;/span&gt;
    &lt;span class="err"&gt;scrape_interval:&lt;/span&gt; &lt;span class="err"&gt;5s&lt;/span&gt;
    &lt;span class="err"&gt;static_configs:&lt;/span&gt;
      &lt;span class="err"&gt;-&lt;/span&gt; &lt;span class="err"&gt;targets:&lt;/span&gt; &lt;span class="nn"&gt;['kafka-jmx-exporter:8080']&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To create the JMX-Exporter image, I needed more tweaks. Let's start with the Dockerfile. The image for the JMX-Exporter uses a base image from Java. Then downloads from Maven repository JMX Prometheus .jar and writes to a file with the name &lt;code&gt;/opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jar&lt;/code&gt;. Next it downloads the Confd and stores in &lt;code&gt;/usr/local/bin/confd&lt;/code&gt;, gives execute permissions. Lastly, it copies the &lt;code&gt;entrypoint&lt;/code&gt; into &lt;code&gt;/opt/entrypoint.sh&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="k"&gt;FROM&lt;/span&gt;&lt;span class="s"&gt; java:8&lt;/span&gt;

&lt;span class="k"&gt;RUN &lt;/span&gt;&lt;span class="nb"&gt;mkdir&lt;/span&gt; /opt/jmx_prometheus_httpserver &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; wget &lt;span class="s1"&gt;'https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_httpserver/0.11.0/jmx_prometheus_httpserver-0.11.0-jar-with-dependencies.jar'&lt;/span&gt; &lt;span class="nt"&gt;-O&lt;/span&gt; /opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jar

&lt;span class="k"&gt;ADD&lt;/span&gt;&lt;span class="s"&gt; https://github.com/kelseyhightower/confd/releases/download/v0.16.0/confd-0.16.0-linux-amd64 /usr/local/bin/confd&lt;/span&gt;
&lt;span class="k"&gt;COPY&lt;/span&gt;&lt;span class="s"&gt; confd /etc/confd&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;&lt;span class="nb"&gt;chmod&lt;/span&gt; +x /usr/local/bin/confd

&lt;span class="k"&gt;COPY&lt;/span&gt;&lt;span class="s"&gt; entrypoint.sh /opt/entrypoint.sh&lt;/span&gt;
&lt;span class="k"&gt;ENTRYPOINT&lt;/span&gt;&lt;span class="s"&gt; ["/opt/entrypoint.sh"]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the &lt;code&gt;entrypoint.sh&lt;/code&gt;, I had only the execution of Confd, then running the &lt;code&gt;start-jmx-scraper.sh&lt;/code&gt;. Hence, after the Confd sets up the source and destination files for both Kafka and JMX Scrapers with &lt;code&gt;.toml&lt;/code&gt;, we run the downloaded &lt;code&gt;jmx_prometheus_httpserver.jar&lt;/code&gt; file. The &lt;code&gt;entrypoint.sh&lt;/code&gt; looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/bin/bash&lt;/span&gt;
/usr/local/bin/confd &lt;span class="nt"&gt;-onetime&lt;/span&gt; &lt;span class="nt"&gt;-backend&lt;/span&gt; &lt;span class="nb"&gt;env&lt;/span&gt;
/opt/start-jmx-scraper.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And the &lt;code&gt;start-jmx-scraper.sh&lt;/code&gt;is as follows, the environment variables in Docker-compose define each of the key (&lt;code&gt;JMX_PORT&lt;/code&gt;, &lt;code&gt;JMX_HOST&lt;/code&gt;, &lt;code&gt;HTTP_PORT&lt;/code&gt;, &lt;code&gt;JMX_EXPORTER_CONFIG_FILE&lt;/code&gt;) mentioned in the command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/bin/bash&lt;/span&gt;
java &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-Dcom&lt;/span&gt;.sun.management.jmxremote.ssl&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;false&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-Djava&lt;/span&gt;.rmi.server.hostname&lt;span class="o"&gt;={{&lt;/span&gt; getv &lt;span class="s2"&gt;"/jmx/host"&lt;/span&gt; &lt;span class="o"&gt;}}&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-Dcom&lt;/span&gt;.sun.management.jmxremote.authenticate&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;false&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-Dcom&lt;/span&gt;.sun.management.jmxremote.port&lt;span class="o"&gt;={{&lt;/span&gt; getv &lt;span class="s2"&gt;"/jmx/port"&lt;/span&gt; &lt;span class="o"&gt;}}&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-jar&lt;/span&gt; /opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jar &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="o"&gt;{{&lt;/span&gt; getv &lt;span class="s2"&gt;"/http/port"&lt;/span&gt; &lt;span class="o"&gt;}}&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    /opt/jmx_prometheus_httpserver/&lt;span class="o"&gt;{{&lt;/span&gt; getv &lt;span class="s2"&gt;"/jmx/exporter/config/file"&lt;/span&gt; &lt;span class="o"&gt;}}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the given custom Docker images for Prometheus automatically scraping Kafka, the full Docker-compose file for the demo project is as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="nn"&gt;---&lt;/span&gt;
&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;3.2"&lt;/span&gt;
&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;zookeeper&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;confluentinc/cp-zookeeper&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;zookeeper&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;ZOOKEEPER_CLIENT_PORT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;2181&lt;/span&gt;
      &lt;span class="na"&gt;ZOOKEEPER_TICK_TIME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;2000&lt;/span&gt;

  &lt;span class="na"&gt;kafka&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;confluentinc/cp-kafka&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;zookeeper&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;9092:9092&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;30001:30001&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_CREATE_TOPICS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;crypto_raw,crypto_latest_trends,crypto_moving_average&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_BROKER_ID&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_ZOOKEEPER_CONNECT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;zookeeper:2181&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_LISTENER_SECURITY_PROTOCOL_MAP&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_INTER_BROKER_LISTENER_NAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_ADVERTISED_LISTENERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_AUTO_CREATE_TOPICS_ENABLE&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true"&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;100&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_JMX_PORT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;30001&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_JMX_HOSTNAME&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;

  &lt;span class="na"&gt;kafka-jmx-exporter&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;./tools/prometheus-jmx-exporter&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;jmx-exporter&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;8080:8080&lt;/span&gt;
    &lt;span class="na"&gt;links&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;JMX_PORT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;30001&lt;/span&gt;
      &lt;span class="na"&gt;JMX_HOST&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;
      &lt;span class="na"&gt;HTTP_PORT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;8080&lt;/span&gt;
      &lt;span class="na"&gt;JMX_EXPORTER_CONFIG_FILE&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka.yml&lt;/span&gt;

  &lt;span class="na"&gt;prometheus&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;./tools/prometheus&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;prometheus&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;9090:9090&lt;/span&gt;

  &lt;span class="na"&gt;spark-master&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;bde2020/spark-master:2.2.2-hadoop2.7&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;spark-master&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./connectors:/connectors&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./:/scripts/&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;8082:8080&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;7077:7077&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;INIT_DAEMON_STEP=setup_spark&lt;/span&gt;

  &lt;span class="na"&gt;spark-worker-1&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;bde2020/spark-worker:2.2.2-hadoop2.7&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;spark-worker-1&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;spark-master&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;8083:8081&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SPARK_MASTER=spark://spark-master:7077"&lt;/span&gt;

  &lt;span class="na"&gt;producer&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;context&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;.&lt;/span&gt;
      &lt;span class="na"&gt;dockerfile&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;./Dockerfile.producer&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;producer&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;crypto-network&lt;/span&gt;

&lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;crypto-network&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;driver&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bridge"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As the Docker-compose contains an additional Producer service when we run the following, we can test our Kafka topic messages per minute by checking the &lt;code&gt;&amp;lt;IP_LOCAL&amp;gt;:9000&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose up
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here the output of the Prometheus UI will be as follows:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fp2tw1obssuuuq1gyz59a.PNG" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fp2tw1obssuuuq1gyz59a.PNG" alt="Prometheus Web UI Example" width="800" height="358"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Last Words
&lt;/h3&gt;

&lt;p&gt;This was a demo project that I made for studying Watermarks and Windowing functions in Streaming Data Processing. Therefore I needed to create a custom producer for Kafka, and consume those using Spark Structured Streaming. Although the development phase of the project was super fun, I also enjoyed creating this pretty long Docker-compose example. &lt;/p&gt;

&lt;p&gt;In case more detail is needed, I am sharing &lt;a href="https://github.com/nazliander/crypto-currency-streaming-prices" rel="noopener noreferrer"&gt;the Github repository&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>docker</category>
      <category>devops</category>
      <category>database</category>
      <category>testing</category>
    </item>
    <item>
      <title>Writing Custom Cross-Validation Methods For Grid Search in Scikit-learn</title>
      <dc:creator>Nazli Ander</dc:creator>
      <pubDate>Sat, 03 Oct 2020 11:29:21 +0000</pubDate>
      <link>https://forem.com/nazliander/writing-custom-cross-validation-methods-for-grid-search-in-scikit-learn-2o21</link>
      <guid>https://forem.com/nazliander/writing-custom-cross-validation-methods-for-grid-search-in-scikit-learn-2o21</guid>
      <description>&lt;p&gt;Recently I was interested in applying Blocking Time Series Split following &lt;a href="https://hub.packtpub.com/cross-validation-strategies-for-time-series-forecasting-tutorial/" rel="noopener noreferrer"&gt;this lovely post&lt;/a&gt; in a Grid Search hyper-parameter tuning setting using scikit-learn library to maintain the time order and prevent information leakage. In this post, I will try to document some knowledge that I build while reading through the articles, documentation, and blog posts about custom cross-validation generators in Python.&lt;/p&gt;

&lt;p&gt;It is great that scikit-learn provides a class called &lt;code&gt;TimeSeriesSplit&lt;/code&gt;, and by using that we can generate fixed time interval training and test sets. Here is a basic example using scikit-learn data generators. I generate a regression dataset with 5 features and 30 samples. Then I generate 3 splits. For those 3 splits, we obtain 10 training examples and &lt;code&gt;n_samples//(n_splits + 1)&lt;/code&gt; test examples:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;numpy&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sklearn.datasets&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;make_regression&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sklearn.model_selection&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;TimeSeriesSplit&lt;/span&gt;

&lt;span class="n"&gt;X_experiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;y_experiment&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;make_regression&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;n_samples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;n_features&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;noise&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;tscv&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;TimeSeriesSplit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;max_train_size&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;n_splits&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;enumerate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tscv&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;split&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;X_experiment&lt;/span&gt;&lt;span class="p"&gt;)):&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Split number: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Training indices: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Test indices: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here the output will be, and it will follow a Walk Forward Cross Validation pattern:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Split number: 0
Training indices: [0 1 2 3 4 5 6 7 8]
Test indices: [ 9 10 11 12 13 14 15]

Split number: 1
Training indices: [ 6  7  8  9 10 11 12 13 14 15]
Test indices: [16 17 18 19 20 21 22]

Split number: 2
Training indices: [13 14 15 16 17 18 19 20 21 22]
Test indices: [23 24 25 26 27 28 29]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;However, the setting that I found was using dates instead of timestamps. This was leading &lt;strong&gt;to discrete numeric values&lt;/strong&gt; as anchor points for cross-validation splits, &lt;strong&gt;instead of continuous&lt;/strong&gt;. Hence, I was not able to leverage the &lt;code&gt;TimeSeriesSplit&lt;/code&gt; from scikit-learn. Instead, I wrote a simple generator object with groupings for date splits to use in Grid Search.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CustomCrossValidation&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

    &lt;span class="nd"&gt;@classmethod&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;split&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cls&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
              &lt;span class="n"&gt;X&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
              &lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ndarray&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
              &lt;span class="n"&gt;groups&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ndarray&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Returns to a grouped time series split generator.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="k"&gt;assert&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;X&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;groups&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;  &lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Length of the predictors is not&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
            &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;matching with the groups.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="c1"&gt;# The min max index must be sorted in the range
&lt;/span&gt;        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;group_idx&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;groups&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;min&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;groups&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;max&lt;/span&gt;&lt;span class="p"&gt;()):&lt;/span&gt;

            &lt;span class="n"&gt;training_group&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;group_idx&lt;/span&gt;
            &lt;span class="c1"&gt;# Gets the next group right after
&lt;/span&gt;            &lt;span class="c1"&gt;# the training as test
&lt;/span&gt;            &lt;span class="n"&gt;test_group&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;group_idx&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
            &lt;span class="n"&gt;training_indices&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;where&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;groups&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;training_group&lt;/span&gt;&lt;span class="p"&gt;)[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="n"&gt;test_indices&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;where&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;groups&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;test_group&lt;/span&gt;&lt;span class="p"&gt;)[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;test_indices&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="c1"&gt;# Yielding to training and testing indices
&lt;/span&gt;                &lt;span class="c1"&gt;# for cross-validation generator
&lt;/span&gt;                &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;training_indices&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;test_indices&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;CustomCrossValidation&lt;/code&gt; is a simple class with one method (&lt;code&gt;split&lt;/code&gt;) uses X (predictors), y (target values), and groups corresponding to the date groups. Those can be months or quarters for your dataset, however, I assumed that those can be mapped into integers to keep the order of time. Hence, if I have 3 quarters in the dataset, I can first have &lt;code&gt;Q1&lt;/code&gt;, &lt;code&gt;Q2&lt;/code&gt;, and &lt;code&gt;Q3&lt;/code&gt; as of date values. But I can simply map those into 0, 1, 2 to keep the order and use those in my validation generator class method.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;split&lt;/code&gt; method, with this naming, is required for &lt;code&gt;GridSearchCV&lt;/code&gt; in scikit-learn. Here, I created a range of integers (groups) to keep the order of date. Then assigned the first group indices (t) to be training indices and the next (t + 1) to be validation indices. Then, in the end, the method yields to training and testing indices as the &lt;code&gt;cv&lt;/code&gt; parameter of the &lt;code&gt;GridSearchCV&lt;/code&gt; method requires a generator object with returning training and testing indices.&lt;/p&gt;

&lt;p&gt;Here the example displays how the custom split works with the groups. To have different sizes of date groups, I created 4 groups with 5 instances of 0s, 10 instances of 1s, 10 instances of 2s, and 10 instances of 3s:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;X_experiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;y_experiment&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;make_regression&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;n_samples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;n_features&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;noise&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;groups_experiment&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;concatenate&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;zeros&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;  &lt;span class="c1"&gt;# 5 0s
&lt;/span&gt;                                    &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ones&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;  &lt;span class="c1"&gt;# 10 1s
&lt;/span&gt;                                    &lt;span class="mi"&gt;2&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ones&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;  &lt;span class="c1"&gt;# 10 2s
&lt;/span&gt;                                    &lt;span class="mi"&gt;3&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ones&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# 10 3s
&lt;/span&gt;                                    &lt;span class="p"&gt;]).&lt;/span&gt;&lt;span class="nf"&gt;astype&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;int&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;enumerate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;CustomCrossValidation&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;split&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;X_experiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                &lt;span class="n"&gt;y_experiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                &lt;span class="n"&gt;groups_experiment&lt;/span&gt;&lt;span class="p"&gt;)):&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Split number: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Training indices: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Test indices: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The example dataset will look like with the groupings:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# The first 5 predictor values...
          0         1         2         3         4
0 -0.566298  0.099651  2.190456 -0.503476 -0.990536
1  0.174578  0.257550  0.404051 -0.074446  1.886186
2  0.314247 -0.908024 -0.562288 -1.412304 -1.012831
3 -1.106335 -1.196207 -0.479174  0.812526 -0.185659
4 -0.013497 -1.057711 -0.601707  0.822545  1.852278

# The first 5 target values...
            0
0   73.398681
1  195.221637
2 -139.402678
3 -124.863423
4   94.753517

# Groupings for the example dataset...
# The 0s are older date anchor values, whereas 3s the newest...
[0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3 3 3 3]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The groups will be used for having an order in the validation flow. Hence first the 0s are going to be used as the training set, and 1s as validation. Then the 1s are going to be used as training, the 2s as validation... The output of the example generated indices will be:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Split number: 0
Training indices: [0 1 2 3 4]
Test indices: [ 5  6  7  8  9 10 11 12 13 14]

Split number: 1
Training indices: [ 5  6  7  8  9 10 11 12 13 14]
Test indices: [15 16 17 18 19 20 21 22 23 24]

Split number: 2
Training indices: [15 16 17 18 19 20 21 22 23 24]
Test indices: [25 26 27 28 29]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To have an example setup, I will be using the Lasso Regression and try to optimize the alpha with Grid Search. In Lasso, when we have a larger alpha, this forces more coefficients to be 0. It is very common to search for the optimum values of alpha in a Lasso Regression.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Instantiating the Lasso estimator
&lt;/span&gt;&lt;span class="n"&gt;reg_estimator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;linear_model&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Lasso&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="c1"&gt;# Parameters
&lt;/span&gt;&lt;span class="n"&gt;parameters_to_search&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;alpha&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mf"&gt;0.1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;]}&lt;/span&gt;
&lt;span class="c1"&gt;# Splitter
&lt;/span&gt;&lt;span class="n"&gt;custom_splitter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;CustomCrossValidation&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;split&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;X&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;X_experiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;y_experiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;groups&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;groups_experiment&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Search setup
&lt;/span&gt;&lt;span class="n"&gt;reg_search&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;GridSearchCV&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;estimator&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;reg_estimator&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;param_grid&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;parameters_to_search&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;scoring&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;neg_root_mean_squared_error&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;cv&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;custom_splitter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="c1"&gt;# Fitting
&lt;/span&gt;&lt;span class="n"&gt;best_model&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;reg_search&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;fit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;X&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;X_experiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;y_experiment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;groups&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;groups_experiment&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will output the best estimator as follows, using the custom cross-validation. There will be 3 splits as we used 4 groups.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Best model:
Lasso(alpha=0.1)

# Number of splits:
3
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Voila, having a simple generator helped me to have a custom validation flow in a Grid Search optimization. I enjoy reading scikit-learn documentation. Besides the fact that reading is fun, it helps me to understand some statistical implementations better and tweak whenever it is necessary.&lt;/p&gt;

&lt;p&gt;To have a complete set of examples, please refer to &lt;a href="https://github.com/nazliander/scikit-learn-programming-examples" rel="noopener noreferrer"&gt;the Github repository&lt;/a&gt;. Happy reading the documentation!&lt;/p&gt;

</description>
      <category>machinelearning</category>
      <category>python</category>
      <category>datascience</category>
    </item>
    <item>
      <title>Windowing in Streaming Data: Theory and a Scikit-Multiflow Example</title>
      <dc:creator>Nazli Ander</dc:creator>
      <pubDate>Fri, 14 Aug 2020 06:37:30 +0000</pubDate>
      <link>https://forem.com/nazliander/windowing-in-streaming-data-theory-and-a-scikit-multiflow-example-55ji</link>
      <guid>https://forem.com/nazliander/windowing-in-streaming-data-theory-and-a-scikit-multiflow-example-55ji</guid>
      <description>&lt;p&gt;With the streaming data, it is almost impossible to store sufficient statistics all over the examples, because of the memory constraints. Moreover, the analysts in the information systems departments are usually more interested in the recent changes in the databases. With time, the properties of the datasets might change and the older data may not give the most important information for a statistical learning algorithm. The changes in the properties of data over time is called concept drift. To increase memory efficiency and overcome concept drift, windowing mechanisms are developed [1].&lt;/p&gt;

&lt;p&gt;In this short review, I want to present four commonly known examples of windowing algorithms on a theoretical level. Those include from simplest to the most complex: Landmark, Sliding, Time-Fading, and Adaptive Sliding.&lt;/p&gt;

&lt;p&gt;For the applied level, &lt;a href="https://scikit-multiflow.github.io/" rel="noopener noreferrer"&gt;scikit-multiflow&lt;/a&gt; [2] provides a nice complementary for scikit-learn with streaming analytics algorithms. I applied ADWIN for detecting concept drift, out of my enthusiasm on the library and the algorithm. I will be sharing this.&lt;/p&gt;

&lt;h2&gt;
  
  
  Landmark
&lt;/h2&gt;

&lt;p&gt;Landmark contains the whole historical data, starting from a landmark point. In the example chart, the alpha point is chosen as a landmark. A system, following the landmark mechanism, is going to include all the data from the alpha point. Landmark mechanism is used when periodic results are needed for the statistical analysis (yearly, monthly, quarterly).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fr9enifcfvo9t0x3w5q6v.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fr9enifcfvo9t0x3w5q6v.png" alt="Landmark Method" width="800" height="440"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Sliding Window
&lt;/h2&gt;

&lt;p&gt;Sliding Window contains the data belonging to the time interval with fixed recency (ꞵ) and binary weighting. ꞵ defines the range of the window length. Hence, the starting point (timestamp) is calculated by subtracting the ꞵ from the current timestamp (t). This requires in each iteration, the older data to be deleted. A sliding window is used when the exact number of data points is critical for the statistical analysis, e.g., traffic monitoring, topic extraction on a news portal [3].&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fu5r3veo7nxltrqi9iao3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fu5r3veo7nxltrqi9iao3.png" alt="Sliding Window Method" width="800" height="600"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Time-Fading Window
&lt;/h2&gt;

&lt;p&gt;A time-fading window is also named as a damped window. It contains the data belonging to the time frame with fixed recency (ꞵ) and a linear or exponential decaying factor (f). The continuous decaying logic takes the recency into account and emphasizes more the recent data less the older data with a certain threshold.&lt;/p&gt;

&lt;p&gt;The time-fading window method can be considered as a special case of a sliding window or landmark method, depending on how the recency is defined.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F8gx9y9a6ip5p2pyx1ilo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F8gx9y9a6ip5p2pyx1ilo.png" alt="Time-Fading Window Method" width="800" height="647"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Adaptive Sliding Window (ADWIN)
&lt;/h2&gt;

&lt;p&gt;ADWIN adjusts the mean values of the objects and keeps those below a threshold level (epsilon). If the mean values significantly deviate from a threshold, it deletes the corresponding old part. It is adaptive to the changing data. For instance, if the change is taking place the window size will shrink automatically, else if the data is stationary the window size will grow to improve the accuracy.&lt;/p&gt;

&lt;p&gt;A better visualization is presented in Albert Bifet's (author of the Adaptive Stream Mining: Pattern Learning and Mining from Evolving Data Streams) &lt;a href="https://www.cs.upc.edu/~abifet/Prova/" rel="noopener noreferrer"&gt;personal website.&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The intuition behind using ADWIN is to keep statistics from a window of variable size while detecting concept drift. By using the &lt;a href="https://scikit-multiflow.github.io/" rel="noopener noreferrer"&gt;scikit-multiflow&lt;/a&gt; library I simulated a distorted data stream with a normal distribution.&lt;/p&gt;

&lt;p&gt;The code below is used for catching the concept drift in the normal distribution (with a mean of 0 and a standard deviation of 0.25). I changed the stream values with the indices between 1000 and 2000 with a different normal distribution (with a mean of 1 and a standard deviation of 0.5). Hence, I expected a width change (decrease) between the stream values 1000 till 2000 and an increase in width till the end of the stream.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;numpy&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;skmultiflow.drift_detection.adwin&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ADWIN&lt;/span&gt;

&lt;span class="n"&gt;adwin&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ADWIN&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;delta&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mf"&gt;0.0002&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;SEED&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;seed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;42&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Simulating a data stream as a normal distribution of 1's and 0's
&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sigma&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mf"&gt;0.25&lt;/span&gt;  &lt;span class="c1"&gt;# mean and standard deviation
&lt;/span&gt;&lt;span class="n"&gt;data_stream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;normal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sigma&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;4000&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Changing the data concept from index 1000 to 2000
&lt;/span&gt;&lt;span class="n"&gt;mu_broken&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sigma_broken&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mf"&gt;0.5&lt;/span&gt;
&lt;span class="n"&gt;data_stream&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;2000&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;np&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;normal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mu_broken&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sigma_broken&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;width_vs_variance&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;

&lt;span class="c1"&gt;# Adding stream elements to ADWIN and verifying if drift occurred
&lt;/span&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;idx&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;4000&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;

    &lt;span class="n"&gt;adwin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_element&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data_stream&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;adwin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;detected_change&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Change in index &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; for stream value &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;data_stream&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;width_vs_variance&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="n"&gt;adwin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;width&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;adwin&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;variance&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The output of this small test for observing the width changes with detected concept drift in our streaming data is displayed in the chart below. As expected, we can observe a zigzag (return back) between the indices (t) 1000 and 2000, then the width grows till the end of the stream.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F1cy9xkw8qvi3er1yd3iu.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F1cy9xkw8qvi3er1yd3iu.png" alt="Test ADWIN" width="800" height="600"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;p&gt;[1] Gama, J., Sebastião, R. &amp;amp; Rodrigues, P.P. On evaluating stream learning algorithms. Mach Learn 90, 317–346 (2013).&lt;/p&gt;

&lt;p&gt;[2] Montiel, J., Read, J., Bifet, A., &amp;amp; Abdessalem, T. (2018). Scikit-multiflow: A multi-output streaming framework. The Journal of Machine Learning Research, 19(72):1−5.&lt;/p&gt;

&lt;p&gt;[3] Youn, J., Choi, J., Shim, J., &amp;amp; Lee, S. (2017). Partition-Based Clustering with Sliding Windows for Data Streams. Lecture Notes in Computer Science, 289–303.&lt;/p&gt;

</description>
      <category>python</category>
      <category>datascience</category>
      <category>dataengineering</category>
      <category>datastreams</category>
    </item>
    <item>
      <title>Scraping and Storing Crypto-currency Prices with Scala and PostgreSQL</title>
      <dc:creator>Nazli Ander</dc:creator>
      <pubDate>Sun, 02 Aug 2020 22:11:35 +0000</pubDate>
      <link>https://forem.com/nazliander/scraping-and-storing-asset-prices-with-scala-and-postgresql-35nn</link>
      <guid>https://forem.com/nazliander/scraping-and-storing-asset-prices-with-scala-and-postgresql-35nn</guid>
      <description>&lt;p&gt;Web scraping mostly involves text-intensive tasks such as product review scraping, gathering real-estate listings, or even tracking online reputation and presence. When one application scrapes only String data types for qualitative analysis, it may not need type safety. However, in case the end goal of the web scraping is to do quantitative analysis with prices or weather forecasts, using a type-safe language might be quite handy.&lt;/p&gt;

&lt;p&gt;In this article, we aim to give a small and interesting example of price scraping for crypto-currencies by using Scala and storing those into a PostgreSQL database. To scrape the prices we selected to use CoinMarketCap &lt;a href="https://coinmarketcap.com/" rel="noopener noreferrer"&gt;homepage&lt;/a&gt;. It is a crypto-currency knowledge website, which gives information also on market capitalizations (relative market sizes), circulating supply, and trading volumes. Even though it is fascinating to see all those information together, to keep it simple we will be only scraping the prices.&lt;/p&gt;

&lt;p&gt;This article might be considered as a tutorial, and it requires a basic level of knowledge of docker-compose and Scala.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--4Gyraa4_--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/9358/1%2AXIv7wtoBBxf95mezwstG6Q.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--4Gyraa4_--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/9358/1%2AXIv7wtoBBxf95mezwstG6Q.jpeg" alt="A summary of the pipeline that this tutorial uses" width="800" height="225"&gt;&lt;/a&gt;&lt;em&gt;A summary of the pipeline that this tutorial uses&lt;/em&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Tools and Steps
&lt;/h2&gt;

&lt;p&gt;While web scraping in Scala, we will be using an HTML parsing library called scala-scraper with &lt;strong&gt;JSoup&lt;/strong&gt;. Following that, we will be inserting the scraped prices to the PostgreSQL database by using a functional JDBC tool called &lt;strong&gt;doobie&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Although we mentioned some fancy library and tool names, the real magic happens in case classes. For each call to the CoinMarketCap homepage, we aim to retrieve the long crypto-currency table with type safety. To do that we created &lt;code&gt;CoinCreate&lt;/code&gt; and &lt;code&gt;CoinInsert&lt;/code&gt; case classes and companion objects.&lt;/p&gt;

&lt;p&gt;We will start explaining first the case classes together with their companion objects, as we aimed to model the data while creating those. Then we will explain the simple functions for retrieving the updated price table from the homepage. Lastly, we will explain how we inserted the table records into the PostgreSQL database running locally. We can power the database with this simple &lt;a href="https://github.com/nazliander/scrape-insert-postgresql/blob/master/simple-tutorial/docker-compose-setup/docker-compose.yml" rel="noopener noreferrer"&gt;docker-compose file&lt;/a&gt;. In the docker-compose file, we initialized a PostgreSQL database with a name dev, username &lt;code&gt;admin&lt;/code&gt;, and a password as &lt;code&gt;admin&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The steps that are explained in this tutorial are displayed in the pipeline above.&lt;/p&gt;

&lt;h2&gt;
  
  
  Case Classes and Companion Objects
&lt;/h2&gt;

&lt;p&gt;Although there might be different approaches to model the data, we can start by creating two case classes as &lt;em&gt;CoinCreate&lt;/em&gt; and &lt;em&gt;CoinInsert&lt;/em&gt;. Those will help us to keep the data types safe while scraping the price table and inserting into a database.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--XdL8XR0y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/4900/1%2AQEZWeKAqcfOVbnDkQtHUFg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--XdL8XR0y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/4900/1%2AQEZWeKAqcfOVbnDkQtHUFg.png" alt="A view from the CoinMarketCap homepage prices table" width="800" height="363"&gt;&lt;/a&gt;&lt;em&gt;A view from the CoinMarketCap homepage prices table&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;CoinCreate&lt;/code&gt; aims to safely type a pair of crypto-currency code and its current price. Thus, it has two parameters code(referring to the currency code) and price(current price in USD). However, while thinking about its companion object we need to consider the shape of the price records in each row. For instance, if we consider only to use coin names and prices in our case class, in an array of records their indices will be 1 and 3. This is quite similar to column indices for tables.&lt;/p&gt;

&lt;p&gt;By observing the price table (above you can find a screenshot from the homepage), we decide to use a companion object to have an apply method for functionally transforming an input of String List to &lt;code&gt;CoinCreate&lt;/code&gt;. Although this transformation is not that straightforward, we can use helper functions to get only the coin code (&lt;code&gt;getCoinCode&lt;/code&gt;) and transform the dollar price string into a double (&lt;code&gt;numberStringToDouble&lt;/code&gt;).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CoinCreate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;code&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;object&lt;/span&gt; &lt;span class="nc"&gt;CoinCreate&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;listOfElements&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;List&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;CoinCreate&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;CoinCreate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
      &lt;span class="n"&gt;code&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;getCoinCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;listOfElements&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)),&lt;/span&gt;
      &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;numberStringToDouble&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;listOfElements&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
    &lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;dollarToNumber&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dlr&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Option&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;p&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"[0-9.]+"&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;r&lt;/span&gt;
    &lt;span class="nv"&gt;p&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;findFirstIn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dlr&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;numberStringToDouble&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strDlr&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Double&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;numberStr&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;dollarToNumber&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strDlr&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="nv"&gt;numberStr&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getOrElse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"0"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;toDouble&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;getCoinCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strCoin&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nv"&gt;strCoin&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;split&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;" "&lt;/span&gt;&lt;span class="o"&gt;)(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;CoinInsert&lt;/code&gt; aims to safely type a pair of crypto-currency code, its current price, and a log timestamp for insertion time logging. We can use this case class while inserting a vector of &lt;code&gt;CoinCreate&lt;/code&gt; into PostgreSQL. As its parameters are so similar to &lt;code&gt;CoinCreate&lt;/code&gt;, we can create a simple companion object to transform a &lt;code&gt;CoinCreate&lt;/code&gt; to &lt;code&gt;CoinInsert&lt;/code&gt;. This object’s apply method can naturally add the current timestamp to a &lt;code&gt;CoinCreate&lt;/code&gt; to obtain a &lt;code&gt;CoinInsert&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Hence the only difference between a &lt;code&gt;CoinCreate&lt;/code&gt; case class and &lt;code&gt;CoinInsert&lt;/code&gt; case class will be the current &lt;code&gt;Timestamp&lt;/code&gt;, notated as a &lt;code&gt;logTimestamp&lt;/code&gt; parameter.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CoinInsert&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;code&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;logTimestamp&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Timestamp&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;object&lt;/span&gt; &lt;span class="nc"&gt;CoinInsert&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;coin&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;CoinCreate&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;logTimestamp&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;CoinInsert&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;CoinInsert&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
      &lt;span class="n"&gt;code&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;coin&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;code&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
      &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;coin&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;price&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
      &lt;span class="n"&gt;logTimestamp&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Timestamp&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;logTimestamp&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Scraping Functions
&lt;/h2&gt;

&lt;p&gt;Scraping with &lt;strong&gt;scala-scraper&lt;/strong&gt; and &lt;strong&gt;JSoup&lt;/strong&gt; is quite easy. First, we need to &lt;code&gt;GET&lt;/code&gt; request to the homepage by creating a new &lt;strong&gt;JSoup&lt;/strong&gt; browser. A new &lt;strong&gt;JSoup&lt;/strong&gt; browser enables us to fetch HTML from the web. Since we need only HTML parsing &lt;strong&gt;JSoup&lt;/strong&gt; was enough in this case, for Javascript using pages other browser options could be used.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;siteConnect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;html&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;browser&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;JsoupBrowser&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;browser.DocumentType&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nv"&gt;browser&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;html&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;By using the &lt;code&gt;GET&lt;/code&gt; request, we need to find the main table and store it as a Vector of Strings. Luckily when we specify that we are looking for a table element, &lt;strong&gt;scala-scraper&lt;/strong&gt;’s table method does all the job for us.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;getCoinUpdatedTable&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;webPage&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                        &lt;span class="n"&gt;tableNameInHTML&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Vector&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;CoinCreate&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;site&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;siteConnect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;webPage&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;JsoupBrowser&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="c1"&gt;// Connects to the webpage.&lt;/span&gt;

  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;tab&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;site&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;table&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="s"&gt;"#${tableNameInHTML}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;// Gets the table with the given name.&lt;/span&gt;

  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;body&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;tab&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;slice&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;tab&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;length&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;// First index belongs to the header of the table.&lt;/span&gt;

  &lt;span class="nv"&gt;body&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;CoinCreate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;x&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;_&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;text&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="c1"&gt;// Table rows are mapped to CoinCreate case class.&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Lastly, we need to slice the &lt;code&gt;Vector&lt;/code&gt;, starting from the second index till the last lines, as the first line contains column names (headers). The resulting sliced &lt;code&gt;Vector&lt;/code&gt; would still have the table rows with their HTML elements as &lt;code&gt;String&lt;/code&gt;. So we can benefit from functional programming to map all the table rows (&lt;code&gt;Vector&lt;/code&gt; elements) while extracting text in the elements then transform to &lt;code&gt;CoinCreate&lt;/code&gt; (the comfort of having a tailor-made apply function).&lt;/p&gt;

&lt;h2&gt;
  
  
  Insertion Functions
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;doobie&lt;/strong&gt; is an amazing functional JDBC layer for Scala. It provides a functional way to write any JDBC program. In this tutorial, we will keep it quite simple by writing only a connection Transactor to connect to the local PostgreSQL database and an insertion function to make the transactions with type-safety.&lt;/p&gt;

&lt;p&gt;To connect to the database, we need to use a Transactor stating the type of the driver (in our case it is a PostgreSQL driver), URL for connection, user name, password, and an Execution Context (EC). The transactor needs an &lt;code&gt;implicit val&lt;/code&gt; to define the EC. For non-blocking operations &lt;strong&gt;doobie&lt;/strong&gt;’s Transactor uses &lt;code&gt;contextShift&lt;/code&gt;. For testing our code &lt;strong&gt;doobie&lt;/strong&gt; documentation recommends using synchronous EC.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;implicit&lt;/span&gt; &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;cs&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;IO&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;contextShift&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;ExecutionContexts&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;synchronous&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;xa&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;Transactor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;fromDriverManager&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;IO&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;
  &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// driver classname&lt;/span&gt;
  &lt;span class="s"&gt;"jdbc:postgresql://localhost:5432/dev"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// connect URL (driver-specific)&lt;/span&gt;
  &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// user&lt;/span&gt;
  &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="c1"&gt;// password&lt;/span&gt;
  &lt;span class="nv"&gt;ExecutionContexts&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;synchronous&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For writing a row by row insertion function we can use SQL interpolation. The function has an input of &lt;code&gt;CoinInsert&lt;/code&gt; and an output of &lt;code&gt;Update0&lt;/code&gt; (representing a parameterized statement where the arguments are known).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;insertCoin&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;coinInsert&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;CoinInsert&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Update0&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="s"&gt;"""
     INSERT INTO coins (code, price, logTimestamp)
     VALUES (${coinInsert.code}, ${coinInsert.price}, ${coinInsert.logTimestamp})
     """&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;update&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Lastly, we can &lt;code&gt;GET&lt;/code&gt; request the homepage by using the &lt;code&gt;getCoinUpdatedTable&lt;/code&gt; function. This will return a &lt;code&gt;Vector&lt;/code&gt; of &lt;code&gt;Strings&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Consequently, we can use this Vector (&lt;code&gt;coinTable&lt;/code&gt;) to transform &lt;code&gt;CoinCreate&lt;/code&gt; to &lt;code&gt;CoinInsert&lt;/code&gt; case class and execute the insert statement we prepared in the previous step.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;coinTable&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt;
    &lt;span class="nf"&gt;getCoinUpdatedTable&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"https://coinmarketcap.com"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"currencies"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;coinTable&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foreach&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; &lt;span class="n"&gt;coinCreate&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;logTimestamp&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;Calendar&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getInstance&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getTimeInMillis&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;coinInsert&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;CoinInsert&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;coinCreate&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;logTimestamp&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;insertCoin&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;coinInsert&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;run&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;transact&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;xa&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;unsafeRunSync&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Last Words
&lt;/h2&gt;

&lt;p&gt;Thanks to &lt;strong&gt;doobie&lt;/strong&gt;, with only a few lines we were able to scrape the crypto-currency prices from CoinMarketCap and insert those into a local PostgreSQL database. Although the code does its job, for now, the source code can be extended with exception handling and monitoring. You can find the whole project in &lt;a href="https://github.com/nazliander/scrape-insert-postgresql/tree/master/simple-tutorial" rel="noopener noreferrer"&gt;this&lt;/a&gt; Github repository.&lt;/p&gt;

&lt;p&gt;This article was originally published &lt;a href="https://itnext.io/scraping-and-storing-crypto-currency-prices-with-scala-and-postgresql-a6bb86a66f74" rel="noopener noreferrer"&gt;in the following link.&lt;/a&gt;&lt;/p&gt;

</description>
      <category>scala</category>
      <category>database</category>
      <category>postgres</category>
      <category>functional</category>
    </item>
    <item>
      <title>Using Selenium With Python in a Docker Container</title>
      <dc:creator>Nazli Ander</dc:creator>
      <pubDate>Sun, 02 Aug 2020 21:15:59 +0000</pubDate>
      <link>https://forem.com/nazliander/using-selenium-within-a-docker-container-ghp</link>
      <guid>https://forem.com/nazliander/using-selenium-within-a-docker-container-ghp</guid>
      <description>&lt;p&gt;While web scraping, I came across many useful applications such as listing old prices of some financial assets or finding current news topics. Although those examples are quite interesting to apply, frequently there was one main goal to reach at the end that is &lt;strong&gt;creating a database with the scraped information&lt;/strong&gt;. &lt;/p&gt;

&lt;p&gt;Whenever I went a bit further on scraping, I ended up in the websites using Javascript to display the data that I needed. Hence, I bumped into Selenium, which is a web testing and automation tool. In this small write up, I aim to list some steps that I find quite useful while setting up Selenium within a Docker container. &lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction to Selenium WebDriver
&lt;/h2&gt;

&lt;p&gt;Selenium WebDriver is a web automation or testing tool. It was created by &lt;a href="https://twitter.com/shs96c" rel="noopener noreferrer"&gt;Simon Stewart&lt;/a&gt; in 2006, as the first cross-platform testing framework that could control the browser from the OS level.&lt;/p&gt;

&lt;p&gt;So with Selenium, I can run some automated actions on browsers (clicks, hovers, and fill forms) by directly communicating with them. Java, C#, PHP, Python, Perl, Go and Ruby are the supported languages for the bindings. Since I am more familiar with Python, I will be talking about it.&lt;/p&gt;

&lt;p&gt;To work on a browser, I need to choose among a set of browser options like Firefox, Chrome (Chromium), Edge, and Safari. As a personal opinion, Chrome with a headless option (not generating a user interface) is the most performant one, hence I will be sticking to that.&lt;/p&gt;

&lt;h2&gt;
  
  
  Pulling the Image and Setting Up Google Chrome
&lt;/h2&gt;

&lt;p&gt;To start with my custom Selenium-Python image, I need a Python image, here in this write-up I picked up the version 3.8.&lt;/p&gt;

&lt;p&gt;Then I can install Google Chrome on top of it. Remember, without the Google Chrome itself, I cannot run Selenium on top of it to run our tasks. There are &lt;a href="https://www.ubuntuupdates.org/ppa/google_chrome" rel="noopener noreferrer"&gt;a few steps&lt;/a&gt; to apply for setting up Google Chrome in Linux:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Adding Google Chrome trusting keys to apt&lt;/li&gt;
&lt;li&gt;Adding Google Chrome stable version to the repositories&lt;/li&gt;
&lt;li&gt;Updating the repositories to see the stable version in apt&lt;/li&gt;
&lt;li&gt;Installing &lt;code&gt;google-chrome-stable&lt;/code&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="k"&gt;FROM&lt;/span&gt;&lt;span class="s"&gt; python:3.8&lt;/span&gt;

&lt;span class="c"&gt;# Adding trusting keys to apt for repositories&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;wget &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-O&lt;/span&gt; - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add -

&lt;span class="c"&gt;# Adding Google Chrome to the repositories&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;sh &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="s1"&gt;'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" &amp;gt;&amp;gt; /etc/apt/sources.list.d/google-chrome.list'&lt;/span&gt;

&lt;span class="c"&gt;# Updating apt to see and install Google Chrome&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;apt-get &lt;span class="nt"&gt;-y&lt;/span&gt; update

&lt;span class="c"&gt;# Magic happens&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;apt-get &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-y&lt;/span&gt; google-chrome-stable
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Installing Chrome Driver
&lt;/h2&gt;

&lt;p&gt;Selenium requires a driver interface to work with the defined browser. Hence, I need to find a way to install Chrome Driver in our Linux image. Here are the steps to follow for doing this:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Installing unzip as we will need for the zipped Chrome Driver&lt;/li&gt;
&lt;li&gt;Download the Chrome Driver into a folder called &lt;code&gt;/tmp/chromedriver.zip&lt;/code&gt;, this name can be changed&lt;/li&gt;
&lt;li&gt;Unzipping the &lt;code&gt;/tmp/chromedriver.zip&lt;/code&gt; into the Linux executable path&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;After those steps, I need to set the display port (99) &lt;a href="https://stackoverflow.com/questions/39219336/cannot-open-firefox-browser-selenium-automation-script-error-cannot-open-disp/39219625" rel="noopener noreferrer"&gt;as Selenium is using this&lt;/a&gt;. It will avoid some crushes.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="c"&gt;# Installing Unzip&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;apt-get &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-yqq&lt;/span&gt; unzip

&lt;span class="c"&gt;# Download the Chrome Driver&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;wget &lt;span class="nt"&gt;-O&lt;/span&gt; /tmp/chromedriver.zip http://chromedriver.storage.googleapis.com/&lt;span class="sb"&gt;`&lt;/span&gt;curl &lt;span class="nt"&gt;-sS&lt;/span&gt; chromedriver.storage.googleapis.com/LATEST_RELEASE&lt;span class="sb"&gt;`&lt;/span&gt;/chromedriver_linux64.zip

&lt;span class="c"&gt;# Unzip the Chrome Driver into /usr/local/bin directory&lt;/span&gt;
&lt;span class="k"&gt;RUN &lt;/span&gt;unzip /tmp/chromedriver.zip chromedriver &lt;span class="nt"&gt;-d&lt;/span&gt; /usr/local/bin/

&lt;span class="c"&gt;# Set display port as an environment variable&lt;/span&gt;
&lt;span class="k"&gt;ENV&lt;/span&gt;&lt;span class="s"&gt; DISPLAY=:99&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Preparing the Docker for a Run
&lt;/h2&gt;

&lt;p&gt;All the steps above were only for setting up Chrome in our Dockerfile. To run my Python application (&lt;code&gt;app.py&lt;/code&gt;) using Docker, I might need the following lines into our Dockerfile.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight docker"&gt;&lt;code&gt;&lt;span class="k"&gt;COPY&lt;/span&gt;&lt;span class="s"&gt; . /app&lt;/span&gt;
&lt;span class="k"&gt;WORKDIR&lt;/span&gt;&lt;span class="s"&gt; /app&lt;/span&gt;

&lt;span class="k"&gt;RUN &lt;/span&gt;pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;--upgrade&lt;/span&gt; pip

&lt;span class="k"&gt;RUN &lt;/span&gt;pip &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-r&lt;/span&gt; requirements.txt

&lt;span class="k"&gt;CMD&lt;/span&gt;&lt;span class="s"&gt; ["python", "./app.py"]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Apart from those Docker settings, I would like to briefly mention some Docker specific chrome options while setting up the Chrome Driver via Python. I want to explicitly show those a few options in one function as &lt;code&gt;set_chrome_options&lt;/code&gt;. Here I set up the example pseudocode with a function below. I need 4 specific arguments to run our Chrome Driver inside Docker:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Explicitly saying that this is a headless application with &lt;code&gt;--headless&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Explicitly bypassing the security level in Docker with &lt;code&gt;--no-sandbox&lt;/code&gt;. There is &lt;a href="https://stackoverflow.com/questions/50642308/webdriverexception-unknown-error-devtoolsactiveport-file-doesnt-exist-while-t" rel="noopener noreferrer"&gt;a nice Stackoverflow thread&lt;/a&gt; over this, apparently as Docker deamon always runs as a root user, Chrome crushes.&lt;/li&gt;
&lt;li&gt;Explicitly disabling the usage of &lt;code&gt;/dev/shm/&lt;/code&gt;. The &lt;code&gt;/dev/shm&lt;/code&gt; partition is too small in certain VM environments, &lt;a href="http://crbug.com/715363" rel="noopener noreferrer"&gt;causing Chrome to fail or crash&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;Disabling the images with &lt;code&gt;chrome_prefs["profile.default_content_settings"] = {"images": 2}&lt;/code&gt;.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;selenium.webdriver.chrome.options&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Options&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;selenium&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;webdriver&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;set_chrome_options&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Sets chrome options for Selenium.
    Chrome options for headless browser is enabled.
    &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;chrome_options&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Options&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;chrome_options&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;--headless&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;chrome_options&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;--no-sandbox&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;chrome_options&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;--disable-dev-shm-usage&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;chrome_prefs&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
    &lt;span class="n"&gt;chrome_options&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;experimental_options&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;prefs&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;chrome_prefs&lt;/span&gt;
    &lt;span class="n"&gt;chrome_prefs&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;profile.default_content_settings&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;images&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;chrome_options&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;driver&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;webdriver&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Chrome&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;chrome_options&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="c1"&gt;# Do stuff with your driver
&lt;/span&gt;    &lt;span class="n"&gt;driver&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Last Words
&lt;/h2&gt;

&lt;p&gt;Here is the Dockerfile, that I took as &lt;a href="https://github.com/nazliander/scrape-nr-of-deaths-istanbul/blob/master/Dockerfile" rel="noopener noreferrer"&gt;an example.&lt;/a&gt; While creating this, I used the links that I shared to solve the problems that I faced. There might be other kinds of solutions to the problems that I faced. I am curious to listen to those.&lt;/p&gt;

&lt;p&gt;Until now, I used it to scrape web archives for asset prices, books, yellow pages, and judgment texts. Although Selenium is not designed for web scraping, I leveraged this nice tool for taming Javascript using websites. But I should admit that, if the information that I was looking for was not hiding in Javascript, I would have been definitely a lot happier with using only &lt;code&gt;Requests&lt;/code&gt;, &lt;code&gt;BeautifulSoup4&lt;/code&gt; and/or &lt;code&gt;Scrapy&lt;/code&gt; for Python. Because all those are simpler to set up, and more performant.&lt;/p&gt;

&lt;p&gt;Happy Scraping!&lt;/p&gt;

</description>
      <category>python</category>
      <category>docker</category>
      <category>testing</category>
      <category>linux</category>
    </item>
  </channel>
</rss>
