<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Tal Doron</title>
    <description>The latest articles on Forem by Tal Doron (@dorongigaspaces).</description>
    <link>https://forem.com/dorongigaspaces</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F281369%2Fbb08fa9a-47fc-416a-af27-7e415a5959b9.jpeg</url>
      <title>Forem: Tal Doron</title>
      <link>https://forem.com/dorongigaspaces</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/dorongigaspaces"/>
    <language>en</language>
    <item>
      <title>Merging Transactional and Predictive Analytics to Scale IoT</title>
      <dc:creator>Tal Doron</dc:creator>
      <pubDate>Thu, 28 Nov 2019 01:58:46 +0000</pubDate>
      <link>https://forem.com/dorongigaspaces/merging-transactional-and-predictive-analytics-to-scale-iot-1k2f</link>
      <guid>https://forem.com/dorongigaspaces/merging-transactional-and-predictive-analytics-to-scale-iot-1k2f</guid>
      <description>&lt;p&gt;&lt;em&gt;This deep dive into InsightEdge shows how it can be used for predictive analytics at scale, aggregating IoT data so it can be used for decision making.&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;The digital universe is estimated to see a 50-fold data increase in the 2010-2020 decade. &lt;a href="http://www.gartner.com/newsroom/id/3165317"&gt;Gartner expects 6.4 billion connected things&lt;/a&gt; will be in use worldwide in 2018, up 30 percent from 2015, and will reach 20.8 billion by 2020.&lt;/p&gt;

&lt;p&gt;In every respect, Big Data is bigger than you can imagine, but moreover – &lt;strong&gt;it’s accelerating&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;When it comes to the IoT, this involves an increasing number of complex projects encompassing hundreds of suppliers, devices, and technologies. &lt;a href="https://www.forrester.com/Michele-Pelino"&gt;Michele Pelino&lt;/a&gt; and &lt;a href="https://www.forrester.com/Frank-E.-Gillett"&gt;Frank E. Gillett&lt;/a&gt; from &lt;a href="https://www.cloudera.com/content/dam/www/static/documents/analyst-reports/forrester-the-iot-heat-map.pdf"&gt;Forrester&lt;/a&gt; predict fleet management in transportation, security and surveillance applications in government, inventory and warehouse management applications in retail and industrial asset management in primary manufacturing will be the hottest areas for IoT growth.&lt;/p&gt;

&lt;p&gt;The impact of increasing the amount of data is the increase in velocity in which we have to ingest that data, perform data analysis and filter the relevant information. With a stream of millions of events per second coming in from IoT devices, organizations must equip themselves with flexible, comprehensive and cost-effective solutions for their IoT needs.&lt;/p&gt;

&lt;p&gt;At GigaSpaces, we’ve come to a realization that the solution to this growing need is not radically changing an existing architecture, but rather extending it through in-memory computing to enable fast analytics and control against fast data. The combination of low latency streaming analytics, along with transactional workflow triggers, enables acting on IoT data in the moment. This includes predictive maintenance and anomaly detection again millions of sensor data points.&lt;/p&gt;

&lt;h1&gt;
  
  
  InsightEdge Fuels Magic’s Predictive Engines for All IoT Needs
&lt;/h1&gt;

&lt;h2&gt;
  
  
  The Challenge
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://www.magicsoftware.com/"&gt;Magic Software Enterprises&lt;/a&gt;, a global provider of enterprise-grade application development and business process integration software solutions and a vendor of a broad range of software and IT services, has been leveraging GigaSpaces XAP for years.&lt;/p&gt;

&lt;p&gt;When Magic came out with their &lt;a href="https://www.magicsoftware.com/magic-xpi-integration-platform"&gt;xpi Integration Platform&lt;/a&gt;, they were looking for a data aggregation solution to form an IoT Hub in front of Magic xpi. This solution needed to be flexible enough to meet a variety of applications regardless of the data and velocity requirements.&lt;/p&gt;

&lt;p&gt;In the age of fast data, the xpi platform, although proving operational interoperability, it still faces the challenge of many existing platforms that are not ready to handle fast data ingestion scenarios. Magic was looking for a POC which could be implemented as quickly as possible while delivering fast results.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Solution
&lt;/h2&gt;

&lt;p&gt;InsightEdge was the perfect choice to help the Magic IoT solutions handle all the difficult data transformation challenges, allowing customers to concentrate on designing the best processes and flows to support their business goals. The solution needed to be flexible and open to any type of data input, regardless of the type and structure of the data, the velocity, running in-memory. That’s where we came in. During our meeting, we suggested a simple solution based on Kafka and InsightEdge to help facilitate data velocity and variety in IoT use cases.&lt;/p&gt;

&lt;p&gt;By integrating &lt;a href="https://insightedge.io/?_ga=2.207705618.1499143694.1495626662-899770352.1468839144"&gt;InsightEdge&lt;/a&gt; in-memory streaming technology, incoming sensor data is analyzed through a multitude of predefined filters and rules and aggregated by InsightEdge. The aggregated data is easily compared, correlated and merged and is transferred in batches to Magic xpi, where a prediction engine is first to predict when IoT equipment failure might occur and to prevent the occurrence of the failure by performing maintenance. Monitoring for future failure allows maintenance to be planned before the failure occurs.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--qg4DU7qp--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252664-insightedge-solution.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--qg4DU7qp--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252664-insightedge-solution.png" alt="Image 1"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Benefits
&lt;/h2&gt;

&lt;p&gt;InsightEdge provides Magic with a few key benefits:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Performance:&lt;/strong&gt; Ability to ingest fast data from multiple IoT sensors.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Data Aggregation:&lt;/strong&gt; InsightEdge is able to handle streaming sensor data at high throughput and aggregate it in time windows that are relevant to each sensor’s notification rhythm.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Fast Data Storage:&lt;/strong&gt; The streamed data then becomes structured into a semantically-rich data model that can be queried from any application.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Simplification of Big Data Architecture:&lt;/strong&gt; InsightEdge easily enables Magic to combine the power of Apache Spark and Fast Data analytics without the need for large-scale data source integration or data replication (ETL).&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Results
&lt;/h2&gt;

&lt;p&gt;Using InsightEdge, Magic is able to provide its customers with fast data streaming and the ability to perform aggregations and calculation capabilities on the in-memory grid. Using the XAP data grid makes the streaming process it that much faster, hence eliminating the need for Hadoop.&lt;/p&gt;

&lt;p&gt;InsightEdge facilities Magic’s customer needs for the IoT deployments with predictive manufacturing and maintenance, enabling them to receive real-time, fast, data-driven events from their systems.&lt;/p&gt;

&lt;h1&gt;
  
  
  InsightEdge Use Case: Car Telemetry Ingestion and Data Prediction Using Magic’s xpi
&lt;/h1&gt;

&lt;p&gt;A live InsightEdge use case is Car Telemetry Ingestion and Data Prediction using Magic’s xpi. In the case of car telemetry, it is very hard to predict in advance what data will be useful. In the case of data prediction, we need to think about not only device telemetry but also diagnostic telemetry.&lt;/p&gt;

&lt;p&gt;Predictive car maintenance requires car telemetry ingestion and data prediction. Magic’s solution stack needed one more component in the architecture to be fully compliant with fast data and scalable scenarios, assured innovation was needed and the correct puzzle piece to fit.&lt;/p&gt;

&lt;p&gt;In this use case, we will cover post-data-collection (assuming we have CSV files but could have been streaming all the same) and up until the data sent to Magic’s xpi Integration Platform.&lt;/p&gt;

&lt;h1&gt;
  
  
  How We Built It
&lt;/h1&gt;

&lt;h2&gt;
  
  
  Kafka
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://kafka.apache.org/intro"&gt;Apache Kafka&lt;/a&gt; is a distributed streaming platform, or a reliable message broker on steroids but not limited to just that. It enables building real-time streaming data pipelines that reliably get data between systems or applications and building real-time streaming applications that transform or react to the streams of data.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--bA9D_1CL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252663-htap-blog-images.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--bA9D_1CL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252663-htap-blog-images.png" alt="Image 2"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Kafdrop
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://github.com/obsidiandynamics/kafdrop"&gt;Kafdrop&lt;/a&gt; is a simple UI monitoring tool for message brokers. In this case, we will use it for Kafka to moderate the topics and messages content during development. Download the &lt;a href="https://hub.docker.com/r/obsidiandynamics/kafdrop"&gt;Kafdrop Docker image&lt;/a&gt; and run using the instructions on DockerHub.&lt;/p&gt;

&lt;h2&gt;
  
  
  InsightEdge
&lt;/h2&gt;

&lt;p&gt;InsightEdge is a high-performance Spark distribution designed for low latency workloads and extreme analytics processing in one unified solution. With a robust analytics capacity and virtually no latency, InsightEdge provides immediate results.&lt;br&gt;
GigaSpaces’ Spark distribution eliminates dependency on Hadoop Distributed File System (HDFS) so as to break through the embedded performance “glass ceiling” of the “stranded” Spark offering. To this, GigaSpaces has added enterprise-grade features, such as high-availability and security. The result is a hardened Spark distribution that is thirty times faster than standard Spark.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://insightedge.io/transportation/"&gt;Download InsightEdge here&lt;/a&gt;. No installation needed, simply unzip the file to the desired location. (Note: Not compatible with Windows)&lt;/p&gt;
&lt;h1&gt;
  
  
  Code Deep-Dive
&lt;/h1&gt;
&lt;h2&gt;
  
  
  Admin Work
&lt;/h2&gt;

&lt;p&gt;First, we need to start Kafka and InsightEdge, so we’ll use the following two scripts:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;1.&lt;/strong&gt; start-kafka.sh&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--8-DNU7tS--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252665-htap-blog-images-3-1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--8-DNU7tS--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252665-htap-blog-images-3-1.png" alt="Image 3"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/usr/bin/env bash&lt;/span&gt;
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"KAFKA_HOME=&lt;/span&gt;&lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;/logs

&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;ZLOGS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;/logs/zookeeper.log
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"Starting ZooKeeper, logs: &lt;/span&gt;&lt;span class="nv"&gt;$ZLOGS&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;span class="nb"&gt;nohup&lt;/span&gt; &lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;/bin/zookeeper-server-start.sh &lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;/config/zookeeper.properties &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;$ZLOGS&lt;/span&gt; 2&amp;gt;&amp;amp;1 &amp;amp;

&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"Waiting 10 seconds..."&lt;/span&gt;
&lt;span class="nb"&gt;sleep &lt;/span&gt;10

&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;KLOGS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;/logs/kafka.log
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"Starting Kafka, logs: &lt;/span&gt;&lt;span class="nv"&gt;$KLOGS&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;span class="nb"&gt;nohup&lt;/span&gt; &lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;/bin/kafka-server-start.sh &lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;/config/server.properties &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;$KLOGS&lt;/span&gt; 2&amp;gt;&amp;amp;1 &amp;amp;

&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"Waiting 15 seconds..."&lt;/span&gt;
&lt;span class="nb"&gt;sleep &lt;/span&gt;15

&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"Creating Kafka topics for car_events"&lt;/span&gt;
&lt;span class="nv"&gt;$KAFKA_HOME&lt;/span&gt;/bin/kafka-topics.sh &lt;span class="nt"&gt;--create&lt;/span&gt; &lt;span class="nt"&gt;--zookeeper&lt;/span&gt; 127.0.0.1:2181 &lt;span class="nt"&gt;--replication-factor&lt;/span&gt; 1 &lt;span class="nt"&gt;--partitions&lt;/span&gt; 1 &lt;span class="nt"&gt;--topic&lt;/span&gt; car_events
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;2.&lt;/strong&gt; start-insightedge.sh&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--uI36WNmZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252666-htap-blog-images-4.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--uI36WNmZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252666-htap-blog-images-4.png" alt="Image 4"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/usr/bin/env bash&lt;/span&gt;
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"INSIGHTEDGE_HOME=&lt;/span&gt;&lt;span class="nv"&gt;$INSIGHTEDGE_HOME&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;span class="nv"&gt;$INSIGHTEDGE_HOME&lt;/span&gt;/sbin/insightedge.sh &lt;span class="nt"&gt;--mode&lt;/span&gt; demo
&lt;span class="nv"&gt;$INSIGHTEDGE_HOME&lt;/span&gt;/datagrid/bin/gs-webui.sh &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; /dev/null 2&amp;gt;&amp;amp;1 &amp;amp;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;3.&lt;/strong&gt; Next, we’ll start Kafdrop so we can have a UI on our Kafka broker. Launch Kafdrop using Docker:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;-it&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; 9000:9000 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KAFKA_BROKERCONNECT&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&amp;lt;host:port&amp;gt; &lt;span class="se"&gt;\&lt;/span&gt;
    obsidiandynamics/kafdrop
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Browse to the local instance to make sure it works &lt;a href="http://localhost:9000"&gt;link: http://localhost:9000&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--PDJeQH9q--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://i.imgur.com/cXYJ6Tv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--PDJeQH9q--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://i.imgur.com/cXYJ6Tv.png" alt="Image 5"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;4.&lt;/strong&gt; Last but not least, we need to start an HTTP server stub (to later be replaced with some other integration endpoint), call the start-http-server.sh:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;
&lt;span class="c"&gt;#!/usr/bin/env bash&lt;/span&gt;
&lt;span class="c"&gt;# a one-line HTTP server ;) &lt;/span&gt;
&lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="nb"&gt;true&lt;/span&gt; &lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;do &lt;/span&gt;nc &lt;span class="nt"&gt;-l&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; 80 &lt;span class="nt"&gt;-c&lt;/span&gt; &lt;span class="s1"&gt;'echo -e "HTTP/1.1 200 OK\n\n Hello, World $(date)"'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Model Class
&lt;/h2&gt;

&lt;p&gt;Now we’ll have to build our model, so let’s see how it should look like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.magic.insightedge.model&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Date&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.insightedge.scala.annotation.SpaceId&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;play.api.libs.json._&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;play.api.libs.json.&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Json&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;Writes&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;scala.beans.BeanProperty&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.insightedge.scala.annotation._&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;scala.beans.&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;BeanProperty&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;BooleanBeanProperty&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;abstract&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MagicEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ID&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Int&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;this&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;this&lt;/span&gt;&lt;span class="o"&gt;(-&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
    &lt;span class="nd"&gt;@BeanProperty&lt;/span&gt; &lt;span class="nd"&gt;@SpaceId&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="nc"&gt;ID&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Int&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nd"&gt;@BeanProperty&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="nc"&gt;COL1&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nd"&gt;@BeanProperty&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="nc"&gt;COL2&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nd"&gt;@BooleanBeanProperty&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="nc"&gt;IsSentByHttp&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Boolean&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;MagicEvent&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;this&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;this&lt;/span&gt;&lt;span class="o"&gt;(-&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mf"&gt;1.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Next, we write our event class (to handle incoming events):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.magic.events&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.Serializable&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;play.api.libs.json.&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Json&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;Writes&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;object&lt;/span&gt; &lt;span class="nc"&gt;Events&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ID&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Int&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;COL1&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;COL2&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;IsSentByHttp&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Boolean&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;implicit&lt;/span&gt; &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;CarEventWrites&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Writes&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;writes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;carEvent&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;Json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;obj&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="s"&gt;"ID"&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;carEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;ID&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="s"&gt;"COL1"&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;carEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;COL1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="s"&gt;"COL2"&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;carEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;COL2&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="s"&gt;"IsSentByHttp"&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;carEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;IsSentByHttp&lt;/span&gt;
        &lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Code
&lt;/h2&gt;

&lt;p&gt;Now that we have our Model and Event-Model we can write the code we want to deploy to Spark (which will actually read from Kafka to Spark and persist to the grid):&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--3yQBHlMW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252668-htap-blog-images-5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--3yQBHlMW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252668-htap-blog-images-5.png" alt="Image 6"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.magic.insightedge&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.magic.events.Events&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.magic.events.Events.CarEvent&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;kafka.serializer.StringDecoder&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.log4j.&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Level&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;Logger&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.storage.StorageLevel&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.streaming.dstream.DStream&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.streaming.kafka.KafkaUtils&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.streaming.&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Seconds&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;StreamingContext&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.SparkConf&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.insightedge.spark.context.InsightEdgeConfig&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.insightedge.spark.implicits.all._&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.rogach.scallop.ScallopConf&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;play.api.libs.json.Json&lt;/span&gt;

&lt;span class="k"&gt;object&lt;/span&gt; &lt;span class="nc"&gt;EventsStreamApp&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;implicit&lt;/span&gt; &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;carEventReads&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;Json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;reads&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Unit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Starting Car Events Stream"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="s"&gt;"with params: ${args.toList}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;conf&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Conf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="s"&gt;"conf=${conf}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="s"&gt;"checkpointDir=${conf.checkpointDir()}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;ssc&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;StreamingContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getOrCreate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;checkpointDir&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;createContext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="nv"&gt;ssc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;start&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="nv"&gt;ssc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;awaitTermination&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"done"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Conf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt; &lt;span class="k"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;ScallopConf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;masterUrl&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="s"&gt;"master-url"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;spaceName&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="s"&gt;"space-name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;lookupGroups&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="s"&gt;"lookup-groups"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;lookupLocators&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="s"&gt;"lookup-locators"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;zookeeper&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="s"&gt;"zookeeper"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;groupId&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="s"&gt;"group-id"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;batchDuration&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="s"&gt;"batch-duration"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;checkpointDir&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="n"&gt;opt&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="s"&gt;"checkpoint-dir"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;required&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;verify&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;createContext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;conf&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Conf&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;StreamingContext&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;ieConfig&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;InsightEdgeConfig&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;spaceName&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="nc"&gt;Some&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;lookupGroups&lt;/span&gt;&lt;span class="o"&gt;()),&lt;/span&gt; &lt;span class="nc"&gt;Some&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;lookupLocators&lt;/span&gt;&lt;span class="o"&gt;()))&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;scConfig&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;SparkConf&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="py"&gt;setAppName&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"EventsStream"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;setMaster&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;masterUrl&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="py"&gt;setInsightEdgeConfig&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ieConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;kafkaParams&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"zookeeper.connect"&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;zookeeper&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="s"&gt;"group.id"&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;groupId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;ssc&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;StreamingContext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;scConfig&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Seconds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;batchDuration&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="py"&gt;toInt&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="nv"&gt;ssc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;checkpoint&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;conf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;checkpointDir&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;sc&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;ssc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;sparkContext&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;rootLogger&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getRootLogger&lt;/span&gt;
        &lt;span class="nv"&gt;rootLogger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;setLevel&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;Level&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;ERROR&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="c1"&gt;// open Kafka streams&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;carStream&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;createCarStream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ssc&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;kafkaParams&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;carStream&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;mapPartitions&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;partitions&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt;
                &lt;span class="nv"&gt;partitions&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;e&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt;
                        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"_______________________________- "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                    &lt;span class="nv"&gt;model&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                        &lt;span class="nv"&gt;e&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;ID&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                        &lt;span class="nv"&gt;e&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;COL1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                        &lt;span class="nv"&gt;e&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;COL2&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                        &lt;span class="nv"&gt;e&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;IsSentByHttp&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;saveToGrid&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;ssc&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;createCarStream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ssc&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;StreamingContext&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;kafkaParams&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;, &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DStream&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nv"&gt;KafkaUtils&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;createStream&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;, &lt;span class="kt"&gt;String&lt;/span&gt;, &lt;span class="kt"&gt;StringDecoder&lt;/span&gt;, &lt;span class="kt"&gt;StringDecoder&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;ssc&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;kafkaParams&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"car_events"&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="nv"&gt;StorageLevel&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;MEMORY_ONLY&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;_&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;_2&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;Json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;parse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;as&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;])&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now, we have three options of running logic:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Spark job&lt;/li&gt;
&lt;li&gt;Grid job (event-driven)&lt;/li&gt;
&lt;li&gt;External job against the Grid (which temporarily “holds” the data for Spark)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;We chose to go with the third option as we have scalability and growth considerations. We need to take into account dozen of external processes running rather than one very long event on the grid. It’s a simple push/pull decision and we’ve decided to pull (If you wish to implement a Processing Unit (PU), see appendix 1).&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--6UFLtTnG--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252669-htap-blog-images-6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--6UFLtTnG--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252669-htap-blog-images-6.png" alt="Imagee 7"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Executing an external job against the Grid is an interesting choice altogether because it is something that can be initiated from any integration end-point, starting with a simple JAR file and implementing it in any custom made code or integration platform:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.magic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.BufferedReader&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.DataOutputStream&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.IOException&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.InputStreamReader&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.net.HttpURLConnection&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.net.URL&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.http.HttpEntity&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.http.HttpResponse&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.http.client.HttpClient&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.http.client.methods.HttpPost&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.http.entity.mime.MultipartEntityBuilder&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.http.impl.client.DefaultHttpClient&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.openspaces.core.GigaSpace&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.openspaces.core.GigaSpaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.openspaces.core.space.UrlSpaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.j_spaces.core.client.SQLQuery&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.magic.insightedge.model.CarEvent&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CarEventDispatcher&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;postUrl&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;InterruptedException&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;length&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"\tNot enough arguments. Usage: java -jar events-dispatcher.jar "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
                               &lt;span class="s"&gt;"&amp;lt;post-url&amp;gt; &amp;lt;remove-dispatched-from-space true|false&amp;gt;"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;exit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;postUrl&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;];&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"\tPOST URL: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;postUrl&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="c1"&gt;// remove dispatched from the space or update field 'isSentByHttp' to 'true'&lt;/span&gt;
        &lt;span class="kt"&gt;boolean&lt;/span&gt; &lt;span class="n"&gt;removeDispatchedFromSpace&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Boolean&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;parseBoolean&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;]);&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"\tRemove dispatched events from space: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;removeDispatchedFromSpace&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;UrlSpaceConfigurer&lt;/span&gt; &lt;span class="n"&gt;spaceConfigurer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;UrlSpaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jini://localhost/*/insightedge-space"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;GigaSpace&lt;/span&gt; &lt;span class="n"&gt;gigaSpace&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;GigaSpaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;gigaSpace&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;SQLQuery&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;SQLQuery&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"isSentByHttp = false AND WIP = 0"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;removeDispatchedFromSpace&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;gigaSpace&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;takeMultiple&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;gigaSpace&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readMultiple&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;length&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Dispatching "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;length&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;" car events from space "&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CarEvent&lt;/span&gt; &lt;span class="nl"&gt;e:&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Posting "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;httpPost&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;postUrl&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
                    &lt;span class="nc"&gt;Thread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;sleep&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;IOException&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;printStackTrace&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;finally&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;removeDispatchedFromSpace&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                        &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setIsSentByHttp&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                        &lt;span class="n"&gt;gigaSpace&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;write&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
                    &lt;span class="o"&gt;}&lt;/span&gt;
                &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Nothing to dispatch, 0 events in space "&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;httpPost&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;postUrl&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;IOException&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;HttpClient&lt;/span&gt; &lt;span class="n"&gt;httpclient&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;DefaultHttpClient&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;HttpPost&lt;/span&gt; &lt;span class="n"&gt;httppost&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;HttpPost&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;postUrl&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;HttpEntity&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;MultipartEntityBuilder&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addTextBody&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"appname"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"IFSIoTDemo"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addTextBody&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"prgname"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"HTTP"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addTextBody&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"arguments"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"-AHTTP_IoTDemo#RobotTransmission,TransmissionXML"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addTextBody&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"TransmissionXML"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;httppost&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setEntity&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;HttpResponse&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;httpclient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;execute&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;httppost&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"HTTP response code = "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getStatusLine&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getStatusCode&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="nc"&gt;BufferedReader&lt;/span&gt; &lt;span class="n"&gt;respReader&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BufferedReader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;InputStreamReader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getEntity&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getContent&lt;/span&gt;&lt;span class="o"&gt;()));&lt;/span&gt;
        &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;line&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="o"&gt;((&lt;/span&gt;&lt;span class="n"&gt;line&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;respReader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readLine&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;line&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;To run the above code, we’ll use a simple script that will call the generated JAR file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/usr/bin/env bash&lt;/span&gt;
&lt;span class="c"&gt;# Run dispatcher every 5 seconds&lt;/span&gt;
&lt;span class="nv"&gt;SLEEP_TIME&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;5
&lt;span class="nv"&gt;POST_URL&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;http://cloud.magicsoftware.de/Magicxpi4.5/MgWebRequester.dll
&lt;span class="c"&gt;# whether to delete dispatched events from the space OR update 'isSentByHttp' field. Possible values: 'true' - remove, 'false' - update&lt;/span&gt;
&lt;span class="nv"&gt;REMOVE_DISPATCHED_FROM_SPACE&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;false
echo&lt;/span&gt; &lt;span class="s2"&gt;" DISPATCHER will run every &lt;/span&gt;&lt;span class="nv"&gt;$SLEEP_TIME&lt;/span&gt;&lt;span class="s2"&gt; seconds:"&lt;/span&gt;
&lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="nb"&gt;true
&lt;/span&gt;&lt;span class="k"&gt;do 
&lt;/span&gt;&lt;span class="nb"&gt;sleep&lt;/span&gt; &lt;span class="nv"&gt;$SLEEP_TIME&lt;/span&gt; 
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"*** Running dispatcher: "&lt;/span&gt;
java &lt;span class="nt"&gt;-jar&lt;/span&gt; ../events-dispatcher/target/events-dispatcher.jar &lt;span class="nv"&gt;$POST_URL&lt;/span&gt; &lt;span class="nv"&gt;$REMOVE_DISPATCHED_FROM_SPACE&lt;/span&gt;
&lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;As a bonus, here’s a code to remove all the car events from the space (the Grid that is):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.magic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.j_spaces.core.client.SQLQuery&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.magic.insightedge.model.CarEvent&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.openspaces.core.GigaSpace&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.openspaces.core.GigaSpaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.openspaces.core.space.UrlSpaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;DeleteCarEventsUtil&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"DELETING all CarEvent's from the space"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;UrlSpaceConfigurer&lt;/span&gt; &lt;span class="n"&gt;spaceConfigurer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;UrlSpaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jini://localhost/*/insightedge-space"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;GigaSpace&lt;/span&gt; &lt;span class="n"&gt;gigaSpace&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;GigaSpaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;spaceConfigurer&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;gigaSpace&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;SQLQuery&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;SQLQuery&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;""&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;gigaSpace&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;takeMultiple&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now that we can dispatch the events from the Grid, let’s work backward on deploying the spark job so it can listen on the Kafka topic and ingest the data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/usr/bin/env bash&lt;/span&gt;
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"INSIGHTEDGE_HOME=&lt;/span&gt;&lt;span class="nv"&gt;$INSIGHTEDGE_HOME&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;span class="nv"&gt;streamJar&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"../events-streaming/target/events-streaming.jar"&lt;/span&gt;
&lt;span class="nv"&gt;ieHost&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;localhost
&lt;span class="nv"&gt;zookeeper&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;localhost:2181
&lt;span class="nb"&gt;nohup&lt;/span&gt; &lt;span class="nv"&gt;$INSIGHTEDGE_HOME&lt;/span&gt;/bin/insightedge-submit &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--class&lt;/span&gt; com.magic.insightedge.EventsStreamApp &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--master&lt;/span&gt; spark://&lt;span class="nv"&gt;$ieHost&lt;/span&gt;:7077 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--executor-cores&lt;/span&gt; 2 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nv"&gt;$streamJar&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--master-url&lt;/span&gt; spark://&lt;span class="nv"&gt;$ieHost&lt;/span&gt;:7077 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--zookeeper&lt;/span&gt; &lt;span class="nv"&gt;$zookeeper&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--group-id&lt;/span&gt; events-processing &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--space-name&lt;/span&gt; insightedge-space &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--lookup-groups&lt;/span&gt; insightedge &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--lookup-locators&lt;/span&gt; &lt;span class="nv"&gt;$ieHost&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--batch-duration&lt;/span&gt; 1 &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;--checkpoint-dir&lt;/span&gt; &lt;span class="s2"&gt;"C1"&lt;/span&gt; &amp;amp;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now we have everything running except two things: The code on collecting the data from the CSV file and how to run it. Here’s the code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.magic.producer&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Properties&lt;/span&gt;
&lt;span class="k"&gt;object&lt;/span&gt; &lt;span class="nc"&gt;ProducerApp&lt;/span&gt; &lt;span class="k"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;App&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"-- Running kafka producer"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"-- Arguments: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nv"&gt;args&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;mkString&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"["&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;","&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"]"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;delim&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"="&lt;/span&gt;
    &lt;span class="nv"&gt;args&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;find&lt;/span&gt;&lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="nv"&gt;_&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;contains&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;delim&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;foreach&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt;
        &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IllegalArgumentException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Incorrect argument "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;a&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;mapArgs&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;args&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nv"&gt;a&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;trim&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;split&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;delim&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;a&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;a&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;a&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;toMap&lt;/span&gt;
    &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;kafkaConfig&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;props&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="nv"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;mapArgs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getOrElse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="nv"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"key.serializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.apache.kafka.common.serialization.StringSerializer"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nv"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"value.serializer"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.apache.kafka.common.serialization.StringSerializer"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nv"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CSV_LOCATION&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;mapArgs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getOrElse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CSV_LOCATION&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"/home/opt/magic/gigaspaces-insightedge-1.0.0-premium/temp2.csv"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="nv"&gt;props&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TOPIC_CAR&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;mapArgs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getOrElse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TOPIC_CAR&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"car_events"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="n"&gt;props&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="s"&gt;"-- kafkaConfig=$kafkaConfig"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="nv"&gt;CSVProducer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;run&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafkaConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;So you might be asking yourselves what is the CSVProducer? Well, here it is:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--fyFzzXAl--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252670-htap-blog-images-7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--fyFzzXAl--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dzone.com/storage/temp/12252670-htap-blog-images-7.png" alt="Image 8"&gt;&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.magic.producer&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;scala.io&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;scala.util.Random&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;UUID&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.magic.events.Events._&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.producer.&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;play.api.libs.json.Json&lt;/span&gt;

&lt;span class="k"&gt;object&lt;/span&gt; &lt;span class="nc"&gt;CSVProducer&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;run&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafkaConfig&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Properties&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;Unit&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"-- Running CSV producer"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"------- first arg:"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="nv"&gt;kafkaConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CSV_LOCATION&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;bufferedSource&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;io&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;Source&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;fromFile&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;kafkaConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CSV_LOCATION&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;producer&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;KafkaProducer&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;, &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;kafkaConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;topic&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;kafkaConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getProperty&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TOPIC_CAR&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="nv"&gt;Thread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;sleep&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="c1"&gt;//drop the headers first line&lt;/span&gt;
        &lt;span class="nf"&gt;for&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;line&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="nv"&gt;bufferedSource&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;getLines&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;drop&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;cols&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;line&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;split&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;","&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;_&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;trim&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="c1"&gt;// do whatever you want with the columns here&lt;/span&gt;
            &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;eventJson&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;Json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;toJson&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CarEvent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="nf"&gt;cols&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;toInt&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="nf"&gt;cols&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
                &lt;span class="nf"&gt;cols&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="py"&gt;toDouble&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;)).&lt;/span&gt;&lt;span class="py"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
            &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="s"&gt;"sending event to $topic $eventJson"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="nv"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;send&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ProducerRecord&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;String&lt;/span&gt;, &lt;span class="kt"&gt;String&lt;/span&gt;&lt;span class="o"&gt;](&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventJson&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
            &lt;span class="nv"&gt;producer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;flush&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
            &lt;span class="nf"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="s"&gt;"JSON is: $eventJson"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="nv"&gt;bufferedSource&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;close&lt;/span&gt;
        &lt;span class="nv"&gt;Thread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;sleep&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;//add two more zeros to wait a whole second&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Run it by using the following script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/usr/bin/env bash&lt;/span&gt;
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"INSIGHTEDGE_HOME=&lt;/span&gt;&lt;span class="nv"&gt;$INSIGHTEDGE_HOME&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
&lt;span class="nv"&gt;producerJar&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"../events-producer/target/magic-events-producer.jar"&lt;/span&gt;
java &lt;span class="nt"&gt;-jar&lt;/span&gt; &lt;span class="nv"&gt;$producerJar&lt;/span&gt; csv.location&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"../KafkaInput.csv"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h1&gt;
  
  
  Final Thoughts
&lt;/h1&gt;

&lt;p&gt;It is estimated that by 2017, &lt;a href="http://digitalistmag.wpengine.netdna-cdn.com/files/2016/03/IDC_IoT_white_paper_Mar2016.pdf"&gt;60% of global manufacturers will use analytics&lt;/a&gt; to sense and analyze data from connected products and manufacturing and optimize increasingly complex portfolios of products. By 2018, the proliferation of advanced, purpose-built, analytic applications aligned with the IoT will result in 15% productivity improvements for manufacturers regarding innovation delivery and supply chain performance.&lt;/p&gt;

&lt;p&gt;The flexibility of combining transactional and analytics functionality provided by InsightEdge and XAP is what separates GigaSpaces from the rest. With Magic’s use case, we are enabling IoT applications at scale through open source components at the center, edge, and cloud.&lt;/p&gt;

&lt;p&gt;Gigaspaces newest data product, InsightEdge offers an Apache Spark-empowered analytics platform to help facilitate full-spectrum analytics (streaming, machine learning, and graph processing) in IoT use cases. We are happy to integrate into Magic’s solution stack, which required full compliance with fast data and scalable scenarios.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>gigaspaces</category>
      <category>docker</category>
      <category>scala</category>
    </item>
  </channel>
</rss>
