<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Ayoub Fakir</title>
    <description>The latest articles on Forem by Ayoub Fakir (@fakirsayoub).</description>
    <link>https://forem.com/fakirsayoub</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F139002%2F5d4cbe51-67e5-4fe9-8ee4-fb8497e34244.png</url>
      <title>Forem: Ayoub Fakir</title>
      <link>https://forem.com/fakirsayoub</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/fakirsayoub"/>
    <language>en</language>
    <item>
      <title>Migrating from a plain Spark Application to ZIO with ZparkIO</title>
      <dc:creator>Ayoub Fakir</dc:creator>
      <pubDate>Fri, 16 Oct 2020 21:38:46 +0000</pubDate>
      <link>https://forem.com/fakirsayoub/migrating-from-a-plain-spark-application-to-zio-with-zparkio-3d3p</link>
      <guid>https://forem.com/fakirsayoub/migrating-from-a-plain-spark-application-to-zio-with-zparkio-3d3p</guid>
      <description>&lt;p&gt;In this article, we'll see how you can migrate your Spark Application into &lt;a href="https://zio.dev"&gt;ZIO&lt;/a&gt; and &lt;a href="https://github.com/leobenkel/ZparkIO"&gt;ZparkIO&lt;/a&gt;, so you can benefit from all the wonderful features that ZIO offers and that we'll be discussing.&lt;/p&gt;

&lt;h2&gt;
  
  
  What is ZIO?
&lt;/h2&gt;

&lt;p&gt;ZIO is defined, according to official documentation as &lt;strong&gt;a library for asynchronous and concurrent programming that is based on pure functional programming.&lt;/strong&gt; In other words, ZIO helps us write code with type-safe, composable and easily testable code, all by using safe and side-effect-free code.&lt;br&gt;
&lt;strong&gt;ZIO is a data type&lt;/strong&gt;. Its signature, &lt;em&gt;ZIO[R, E, A]&lt;/em&gt; shows us that it has three parameters:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;R&lt;/strong&gt;, which is the environment type. Meaning that the effect that we're describing needs an environment, which is optional (would be &lt;code&gt;Any&lt;/code&gt; when optional). The environment describes what is required to execute this Task.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;E&lt;/strong&gt;, which is the error with which the effect &lt;em&gt;may&lt;/em&gt; fail.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;A&lt;/strong&gt;, which is the data type that is returned in case of success.
## How do we apply ZIO to Spark?
From the signature of the &lt;strong&gt;ZIO&lt;/strong&gt; data type, we can deduce that for us to use Spark with ZIO, we need to specify one necessary parameter, namely, the &lt;strong&gt;R&lt;/strong&gt;. In our case, the &lt;strong&gt;R&lt;/strong&gt; is the &lt;strong&gt;SparkSession&lt;/strong&gt;, which is the entry point to every Spark Job, and the component that resides in the &lt;strong&gt;Spark Driver&lt;/strong&gt;.
The process of integrating ZIO to our Spark Programs can be hard, especially for beginners. Thankfully, &lt;a href="https://leobenkel.com"&gt;Leo Benkel&lt;/a&gt; has created a library, namely &lt;strong&gt;&lt;a href="https://github.com/leobenkel/ZparkIO"&gt;ZparkIO&lt;/a&gt;&lt;/strong&gt;, which is a library project that implements the whole process of marrying Spark and ZIO together!
## Using ZparkIO to bootstrap a Spark / ZIO project
The first step is to create a trait that extends &lt;code&gt;ZparkioApp[R, E, A]&lt;/code&gt;, where you would need to override two methods: &lt;code&gt;makeCli&lt;/code&gt; and &lt;code&gt;runApp&lt;/code&gt;.
&lt;code&gt;makeCli(args: List[String])&lt;/code&gt; compiles all the program arguments for you (for the moment, &lt;em&gt;scallop&lt;/em&gt; is used by default, but we're in the process of extracting this module so you can use your CLI tool of choice).
&lt;code&gt;runApp():ZIO[COMPLETE_ENV, E, A]&lt;/code&gt; is the main function where your program's logic resides.
&lt;/li&gt;
&lt;/ol&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;override&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;runApp&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;ZIO&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;COMPLETE_ENV&lt;/span&gt;, &lt;span class="kt"&gt;Throwable&lt;/span&gt;, &lt;span class="kt"&gt;Unit&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;???&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;Once you're setup, your SparkSession can be access as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nc"&gt;SparkModule&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And if you want to acces the implicits of Spark, the trick is to map over SparkModule():&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nc"&gt;SparkModule&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; &lt;span class="n"&gt;ss&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;ss.implicits._&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;???&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Migrating your project
&lt;/h2&gt;

&lt;p&gt;Once your project is set up, the process of migrating your project can be pretty straightforward.&lt;/p&gt;

&lt;h3&gt;
  
  
  Defining your program's arguments
&lt;/h3&gt;

&lt;p&gt;ZparkIO uses &lt;strong&gt;&lt;a href="https://github.com/scallop/scallop"&gt;scallop&lt;/a&gt;&lt;/strong&gt;. So to define the CLI arguments of your program you would need to define a case class, &lt;em&gt;Arguments&lt;/em&gt;, which has the following signature:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Arguments&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;List&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="k"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;ScallopConf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nv"&gt;CommandLineArguments&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;Service&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;???&lt;/span&gt; &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, each one of your arguments has to be declared as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;argumentOne&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;ScallopOption&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;default&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;None&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;noshort&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;descr&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Description of this argument"&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;HINT: The argument defined here is called "argumentOne"; however, it has to be called as &lt;strong&gt;argument-one&lt;/strong&gt; from the CLI while executing your project. Scallop logic! :D&lt;/p&gt;

&lt;h3&gt;
  
  
  Migrating your code
&lt;/h3&gt;

&lt;p&gt;The first step is that &lt;strong&gt;every function/helper that you have in your program needs to start returning something of type ZIO&lt;/strong&gt;. As an example, let's take two methods, one that reads data from an external file system without ZparkIO, and another that uses it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;readData&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;inputPath&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;)(&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="k"&gt;implicit&lt;/span&gt; &lt;span class="n"&gt;sparkSession&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;SparkSession&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Dataset&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="nv"&gt;sparkSession&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputPath&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In a plain Spark application, this method returns a Dataset of some type A, and… That's it! If we fail to read the inputPath given as a parameter for some reason, the whole program crashes, and we do not catch the reason why (at least not at first sight).&lt;br&gt;
This same method, using ZIO and ZparkIO will be written as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;readData&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;inputPath&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;ZIO&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;SparkModule&lt;/span&gt;, &lt;span class="kt"&gt;Throwable&lt;/span&gt;, &lt;span class="kt"&gt;Dataset&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;]]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nc"&gt;SparkModule&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;dataset&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputPath&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;dataset&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;or&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;readData&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;inputPath&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;ZIO&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;SparkModule&lt;/span&gt;, &lt;span class="kt"&gt;Throwable&lt;/span&gt;, &lt;span class="kt"&gt;Dataset&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;]]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="nc"&gt;SparkModule&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;_&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputPath&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Please note that we didn't have to use an implicit parameter as the session is already provided by the SparkModule() method of the library. The function &lt;strong&gt;readData&lt;/strong&gt; uses a Spark Environment, namely &lt;strong&gt;SparkModule&lt;/strong&gt;, may fail with a Throwable, and in case of success, returns a Dataset of some type A (I'm using Datasets here instead of DataFrames because first, we need some better typing, and second, Leo is allergic to Dataframes).&lt;br&gt;
The instruction to read data from the filesystem is wrapped into a &lt;em&gt;Task&lt;/em&gt;. Task is of type IO[Throwable, A], which means that it does not depend on any environment (implicitly &lt;em&gt;Any&lt;/em&gt;). Then we provide the dataset we just read, which matches the return type of our function.&lt;br&gt;
Once your function is wrapped in a Task as such, you can start leveraging ZIO features such as &lt;code&gt;retry&lt;/code&gt; and &lt;code&gt;timeout&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;protected&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;retryPolicy&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="nv"&gt;Schedule&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;recurs&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="nv"&gt;Schedule&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;exponential&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;SECONDS&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;readData&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;inputPath&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;ZIO&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;SparkModule&lt;/span&gt;, &lt;span class="kt"&gt;Throwable&lt;/span&gt;, &lt;span class="kt"&gt;Dataset&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;]]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nc"&gt;SparkModule&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;dataset&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputPath&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;retry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;retryPolicy&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;timeoutFail&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ZparkioApplicationTimeoutException&lt;/span&gt;&lt;span class="o"&gt;())(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;TimeUnit&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;MINUTES&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;dataset&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this example, if the &lt;code&gt;read&lt;/code&gt; function fails, it will retry up to 3 times with an exponential wait in between each retry but the total amount of time spent on this task cannot exceed 10 minutes.&lt;/p&gt;

&lt;h3&gt;
  
  
  Some other function examples:
&lt;/h3&gt;

&lt;p&gt;Obviously, your Spark program has more methods than just reading data from HDFS or S3. Let's take another example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;calculateSomeAggregations&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;ds&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Dataset&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Dataset&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;ds&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;groupBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"someColumn"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;agg&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="nf"&gt;sum&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;when&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"someOtherColumn"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="s"&gt;"value"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;otherwise&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This function calculates some aggregations of some Dataset of type A. One easy way to begin your migration to ZIO and ZparkIO is to wrap its calculations into a &lt;em&gt;Task&lt;/em&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;calculateSomeAggregations&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;ds&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Dataset&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;IO&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;Throwable&lt;/span&gt;, &lt;span class="kt"&gt;Dataset&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;A&lt;/span&gt;&lt;span class="o"&gt;]]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Task&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;ds&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;groupBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"someColumn"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;agg&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="nf"&gt;sum&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;when&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"someOtherColumn"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;===&lt;/span&gt; &lt;span class="s"&gt;"value"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;otherwise&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;lit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And… Voilà! Your safe method is ready!&lt;/p&gt;

&lt;h2&gt;
  
  
  But after all, why would you need to migrate your project and start using ZIO?
&lt;/h2&gt;

&lt;p&gt;Well, one first and obvious argument is that it's… safer, more composable, and lets you more easily think about the logic of your program.&lt;br&gt;
Alright, now let's take the following example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;df1&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"path1"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;df2&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"path2"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;df3&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"path3"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="err"&gt;…&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;dfn&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"pathn"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We all know that Spark is the leading framework for distributed programming and parallel calculations. However, imagine that you are running a Spark Program in an EMR cluster having 20 nodes of 32GB of RAM and 20 CPU Cores each. That's a lot of executors to be instantiated in. You set up a cluster of this size because you do some heavy &lt;strong&gt;, joins&lt;/strong&gt;, &lt;strong&gt;sorts&lt;/strong&gt; and &lt;strong&gt;groupBys&lt;/strong&gt;. But still, you read a lot of parquet partitions at the beginning of your program, and the problem is that… All the instructions shown earlier will be executed sequentially. When &lt;code&gt;df1&lt;/code&gt; is being read, it certainly does not use the whole capacity of your cluster, and the next instruction needs to wait for the first one to end, and so on. That's a lot of resources waste.&lt;br&gt;
Thanks to &lt;strong&gt;ZIO&lt;/strong&gt; and its &lt;strong&gt;Fibers&lt;/strong&gt; feature, we can force those readings to be run in parallel as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;df1&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"path1"&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;fork&lt;/span&gt;
&lt;span class="err"&gt; …&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;dfn&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;read&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parquet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"pathn"&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;fork&lt;/span&gt;

&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;readDf1&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nv"&gt;df1&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;join&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;readDf2&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nv"&gt;df2&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;join&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;strong&gt;fork&lt;/strong&gt; / &lt;strong&gt;join&lt;/strong&gt; combo will force those instructions to be executed in parallel, and thus, a minimum of the Cluster's resources waste!&lt;br&gt;
Another useful method that we can use in this context is &lt;strong&gt;foreachPar&lt;/strong&gt;. Let's suppose that we have a list of paths of parquet partitions that we want to read, in parallel using our &lt;strong&gt;readData(inputPath: String)&lt;/strong&gt; method defined earlier:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;dfs&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nv"&gt;ZIO&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foreachPar&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filePaths&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;filePath&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="nf"&gt;readData&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filePath&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;dfs&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The value that we return here is a List of the Dataframes that we read, that we can now use throughout our program.&lt;br&gt;
And even further, &lt;code&gt;foreachParN&lt;/code&gt; which allows you to specify how many task can run in parallele at most:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;dfs&lt;/span&gt; &lt;span class="k"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="nv"&gt;ZIO&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;foreachParN&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;)(&lt;/span&gt;&lt;span class="n"&gt;filePaths&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="n"&gt;filePath&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="nf"&gt;readData&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;filePath&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;yield&lt;/span&gt; &lt;span class="n"&gt;dfs&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This code will limit the parallel execution to &lt;code&gt;5&lt;/code&gt;, never more, which is a great way to manage your resources. This could even be a multiple of the number of executors available in the cluster.&lt;/p&gt;

&lt;h3&gt;
  
  
  Wrap up
&lt;/h3&gt;

&lt;p&gt;We hope that this article will give you a taste about why you should use ZIO and Spark with the help of ZparkIO to get your Spark jobs to the next level of performance, safety and fun!&lt;/p&gt;

</description>
      <category>scala</category>
      <category>spark</category>
      <category>zio</category>
      <category>functional</category>
    </item>
  </channel>
</rss>
