<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Imre Aranyosi</title>
    <description>The latest articles on Forem by Imre Aranyosi (@glasstiger).</description>
    <link>https://forem.com/glasstiger</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1060248%2Fe55508de-1ab0-4642-9151-c6dfdb638756.png</url>
      <title>Forem: Imre Aranyosi</title>
      <link>https://forem.com/glasstiger</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/glasstiger"/>
    <language>en</language>
    <item>
      <title>Integrate Apache Spark and QuestDB for Time-Series Analytics</title>
      <dc:creator>Imre Aranyosi</dc:creator>
      <pubDate>Thu, 06 Apr 2023 16:40:46 +0000</pubDate>
      <link>https://forem.com/glasstiger/integrate-apache-spark-and-questdb-for-time-series-analytics-3i3n</link>
      <guid>https://forem.com/glasstiger/integrate-apache-spark-and-questdb-for-time-series-analytics-3i3n</guid>
      <description>&lt;p&gt;&lt;a href="https://spark.apache.org/"&gt;Spark&lt;/a&gt; is an analytics engine for large-scale data engineering. Despite its long history, it still has its well-deserved place in the big data landscape. QuestDB, on the other hand, is a time-series database with a very high data ingestion rate. This means that Spark desperately needs data, a lot of it! ...and QuestDB has it, a match made in heaven.&lt;/p&gt;

&lt;p&gt;Of course, there is pandas for data analytics! The key here is the expression &lt;em&gt;large-scale&lt;/em&gt;. Unlike pandas, Spark is a distributed system and can scale really well.&lt;/p&gt;

&lt;p&gt;What does this mean exactly?&lt;/p&gt;

&lt;p&gt;Let's take a look at how data is processed in Spark.&lt;/p&gt;

&lt;p&gt;For the purposes of this article, we only need to know that a Spark job consists of multiple tasks, and each task works on a single data partition. Tasks are executed parallel in stages, distributed on the cluster. Stages have a dependency on the previous ones; tasks from different stages cannot run in parallel.&lt;/p&gt;

&lt;p&gt;The schematic diagram below depicts an example job:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--T5Al-Apr--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/9u8t0z8cln2cecaxjnni.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--T5Al-Apr--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/9u8t0z8cln2cecaxjnni.png" alt="Diagram showing data exported to Spark as jobs" width="752" height="722"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this tutorial, we will load data from a QuestDB table into a Spark application and explore the inner working of Spark to refine data loading.&lt;br&gt;
Finally, we will modify and save the data back to QuestDB.&lt;/p&gt;
&lt;h2&gt;
  
  
  Loading data to Spark
&lt;/h2&gt;

&lt;p&gt;First thing first, we need to load time-series data from QuestDB. I will use an existing table, &lt;code&gt;trades&lt;/code&gt;, with just over 1.3 million rows.&lt;/p&gt;

&lt;p&gt;It contains bitcoin trades, spanning over 3 days: not exactly a big data scenario but good enough to experiment.&lt;/p&gt;

&lt;p&gt;The table contains the following columns:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Column Name&lt;/th&gt;
&lt;th&gt;Column Type&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;symbol&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;SYMBOL&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;side&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;SYMBOL&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;price&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;DOUBLE&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;amount&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;DOUBLE&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;timestamp&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;TIMESTAMP&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;The table is partitioned by day, and the &lt;code&gt;timestamp&lt;/code&gt; column serves as the designated timestamp.&lt;/p&gt;

&lt;p&gt;QuestDB accepts connections via Postgres wire protocol, so we can use JDBC to integrate. You can choose from various languages to create Spark applications, and here we will go for Python.&lt;/p&gt;

&lt;p&gt;Create the script, &lt;code&gt;sparktest.py&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;

&lt;span class="c1"&gt;# create Spark session
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"questdb_test"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# load 'trades' table into the dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jdbc"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"driver"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"quest"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbtable"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"trades"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# print the number of rows
&lt;/span&gt;&lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

&lt;span class="c1"&gt;# do some filtering and print the first 3 rows of the data
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;'BTC-USD'&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nb"&gt;filter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;side&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;'buy'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;show&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;truncate&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Believe it or not, this tiny application already reads data from the database when submitted as a Spark job.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;spark-submit &lt;span class="nt"&gt;--jars&lt;/span&gt; postgresql-42.5.1.jar sparktest.py

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The job prints the following row count:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;1322570

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And these are the first 3 rows of the filtered table:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;+-------+----+--------+---------+--------------------------+
|symbol |side|price   |amount   |timestamp                 |
+-------+----+--------+---------+--------------------------+
|BTC-USD|buy |23128.72|6.4065E-4|2023-02-01 00:00:00.141334|
|BTC-USD|buy |23128.33|3.7407E-4|2023-02-01 00:00:01.169344|
|BTC-USD|buy |23128.33|8.1796E-4|2023-02-01 00:00:01.184992|
+-------+----+--------+---------+--------------------------+
only showing top 3 rows

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Although &lt;code&gt;sparktest.py&lt;/code&gt; speaks for itself, it is still worth mentioning that this application has a dependency on the JDBC driver located in &lt;code&gt;postgresql-42.5.1.jar&lt;/code&gt;. It cannot run without this dependency; hence it has to be submitted to Spark together with the application.&lt;/p&gt;

&lt;h2&gt;
  
  
  Optimizing data loading with Spark
&lt;/h2&gt;

&lt;p&gt;We have loaded data into Spark. Now we will look at how this was completed and some aspects to consider.&lt;/p&gt;

&lt;p&gt;The easiest way to peek under the hood is to check QuestDB's log, which should tell us how Spark interacted with the database. We will also make use of the Spark UI, which displays useful insights of the execution, including stages and tasks.&lt;/p&gt;

&lt;h3&gt;
  
  
  Spark connection to QuestDB: Spark is lazy
&lt;/h3&gt;

&lt;p&gt;QuestDB log shows that Spark connected three times to the database. For simplicity I only show the relevant lines in the log:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;2023-03-21T21:12:24.031443Z I pg-server connected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129]
2023-03-21T21:12:24.060520Z I i.q.c.p.PGConnectionContext parse &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129, &lt;span class="nv"&gt;q&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;SELECT &lt;span class="k"&gt;*&lt;/span&gt; FROM trades WHERE &lt;span class="nv"&gt;1&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0]
2023-03-21T21:12:24.072262Z I pg-server disconnected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129, &lt;span class="nv"&gt;src&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;queue]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Spark first queried the database when we created the DataFrame, but as it turns out, it was not too interested in the data itself. The query looked like this:&lt;br&gt;
&lt;code&gt;SELECT * FROM trades WHERE 1=0&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;The only thing Spark wanted to know was the schema of the table in order to create an empty DataFrame. Spark evaluates expressions lazily, and only does the bare minimum required at each step. After all, it is meant to analyze big data,&lt;br&gt;
so resources are incredibly precious for Spark. Especially memory: data is not cached by default.&lt;/p&gt;

&lt;p&gt;The second connection happened when Spark counted the rows of the DataFrame. It did not query the data this time, either. Interestingly, instead of pushing the aggregation down to the database by running &lt;code&gt;SELECT count(*) FROM trades&lt;/code&gt;, it&lt;br&gt;
just queried a &lt;code&gt;1&lt;/code&gt; for each record: &lt;code&gt;SELECT 1 FROM trades&lt;/code&gt;.&lt;br&gt;
Spark adds the &lt;code&gt;1&lt;/code&gt;s together to get the actual count.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;2023-03-21T21:12:25.692098Z I pg-server connected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129]
2023-03-21T21:12:25.693863Z I i.q.c.p.PGConnectionContext parse &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129, &lt;span class="nv"&gt;q&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;SELECT 1 FROM trades     &lt;span class="o"&gt;]&lt;/span&gt;
2023-03-21T21:12:25.695284Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;487309, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;2, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]
2023-03-21T21:12:25.749986Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;457478, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]
2023-03-21T21:12:25.800765Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;2, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]
2023-03-21T21:12:25.881785Z I pg-server disconnected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129, &lt;span class="nv"&gt;src&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;queue]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Working with the data itself eventually forced Spark to get a taste of the table's content too. Filters are pushed down to the database by default.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;2023-03-21T21:12:26.132570Z I pg-server connected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;28]
2023-03-21T21:12:26.134355Z I i.q.c.p.PGConnectionContext parse &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;28, &lt;span class="nv"&gt;q&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;SELECT &lt;span class="s2"&gt;"symbol"&lt;/span&gt;,&lt;span class="s2"&gt;"side"&lt;/span&gt;,&lt;span class="s2"&gt;"price"&lt;/span&gt;,&lt;span class="s2"&gt;"amount"&lt;/span&gt;,&lt;span class="s2"&gt;"timestamp"&lt;/span&gt; FROM trades  WHERE &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"symbol"&lt;/span&gt; IS NOT NULL&lt;span class="o"&gt;)&lt;/span&gt; AND &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"side"&lt;/span&gt; IS NOT NULL&lt;span class="o"&gt;)&lt;/span&gt; AND &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"symbol"&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'BTC-USD'&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; AND &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"side"&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'buy'&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;   &lt;span class="o"&gt;]&lt;/span&gt;
2023-03-21T21:12:26.739124Z I pg-server disconnected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;28, &lt;span class="nv"&gt;src&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;queue]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can see that Spark's interaction with the database is rather sophisticated, and optimized to achieve good performance without wasting resources. The Spark DataFrame is the key component which takes care of the optimization, and it deserves some further analysis.&lt;/p&gt;

&lt;h3&gt;
  
  
  What is a Spark DataFrame?
&lt;/h3&gt;

&lt;p&gt;The name &lt;code&gt;DataFrame&lt;/code&gt; sounds like a container to hold data, but we have seen it earlier that this is not really true. So what is a Spark DataFrame then?&lt;/p&gt;

&lt;p&gt;One way to look at Spark SQL, with the risk of oversimplifying it, is that it is a query engine. &lt;code&gt;df.filter(predicate)&lt;/code&gt; is really just another way of saying&lt;br&gt;
&lt;code&gt;WHERE predicate&lt;/code&gt;. With this in mind, the DataFrame is pretty much a query, or actually more like a query plan.&lt;/p&gt;

&lt;p&gt;Most databases come with functionality to display query plans, and Spark has it too! Let's check the plan for the above DataFrame we just created:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;explain&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;extended&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="o"&gt;==&lt;/span&gt; Parsed Logical Plan &lt;span class="o"&gt;==&lt;/span&gt;
Filter &lt;span class="o"&gt;(&lt;/span&gt;side#1 &lt;span class="o"&gt;=&lt;/span&gt; buy&lt;span class="o"&gt;)&lt;/span&gt;
+- Filter &lt;span class="o"&gt;(&lt;/span&gt;symbol#0 &lt;span class="o"&gt;=&lt;/span&gt; BTC-USD&lt;span class="o"&gt;)&lt;/span&gt;
   +- Relation &lt;span class="o"&gt;[&lt;/span&gt;symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation&lt;span class="o"&gt;(&lt;/span&gt;trades&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;numPartitions&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1]

&lt;span class="o"&gt;==&lt;/span&gt; Analyzed Logical Plan &lt;span class="o"&gt;==&lt;/span&gt;
symbol: string, side: string, price: double, amount: double, timestamp: timestamp
Filter &lt;span class="o"&gt;(&lt;/span&gt;side#1 &lt;span class="o"&gt;=&lt;/span&gt; buy&lt;span class="o"&gt;)&lt;/span&gt;
+- Filter &lt;span class="o"&gt;(&lt;/span&gt;symbol#0 &lt;span class="o"&gt;=&lt;/span&gt; BTC-USD&lt;span class="o"&gt;)&lt;/span&gt;
   +- Relation &lt;span class="o"&gt;[&lt;/span&gt;symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation&lt;span class="o"&gt;(&lt;/span&gt;trades&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;numPartitions&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1]

&lt;span class="o"&gt;==&lt;/span&gt; Optimized Logical Plan &lt;span class="o"&gt;==&lt;/span&gt;
Filter &lt;span class="o"&gt;((&lt;/span&gt;isnotnull&lt;span class="o"&gt;(&lt;/span&gt;symbol#0&lt;span class="o"&gt;)&lt;/span&gt; AND isnotnull&lt;span class="o"&gt;(&lt;/span&gt;side#1&lt;span class="o"&gt;))&lt;/span&gt; AND &lt;span class="o"&gt;((&lt;/span&gt;symbol#0 &lt;span class="o"&gt;=&lt;/span&gt; BTC-USD&lt;span class="o"&gt;)&lt;/span&gt; AND &lt;span class="o"&gt;(&lt;/span&gt;side#1 &lt;span class="o"&gt;=&lt;/span&gt; buy&lt;span class="o"&gt;)))&lt;/span&gt;
+- Relation &lt;span class="o"&gt;[&lt;/span&gt;symbol#0,side#1,price#2,amount#3,timestamp#4] JDBCRelation&lt;span class="o"&gt;(&lt;/span&gt;trades&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;numPartitions&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1]

&lt;span class="o"&gt;==&lt;/span&gt; Physical Plan &lt;span class="o"&gt;==&lt;/span&gt;
&lt;span class="k"&gt;*&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;1&lt;span class="o"&gt;)&lt;/span&gt; Scan JDBCRelation&lt;span class="o"&gt;(&lt;/span&gt;trades&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;numPartitions&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1] &lt;span class="o"&gt;[&lt;/span&gt;symbol#0,side#1,price#2,amount#3,timestamp#4] PushedFilters: &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="k"&gt;*&lt;/span&gt;IsNotNull&lt;span class="o"&gt;(&lt;/span&gt;symbol&lt;span class="o"&gt;)&lt;/span&gt;, &lt;span class="k"&gt;*&lt;/span&gt;IsNotNull&lt;span class="o"&gt;(&lt;/span&gt;side&lt;span class="o"&gt;)&lt;/span&gt;, &lt;span class="k"&gt;*&lt;/span&gt;EqualTo&lt;span class="o"&gt;(&lt;/span&gt;symbol,BTC-USD&lt;span class="o"&gt;)&lt;/span&gt;, &lt;span class="k"&gt;*&lt;/span&gt;EqualTo&lt;span class="o"&gt;(&lt;/span&gt;side,buy&lt;span class="o"&gt;)]&lt;/span&gt;, ReadSchema: struct&amp;lt;symbol:string,side:string,price:double,amount:double,timestamp:timestamp&amp;gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If the DataFrame knows how to reproduce the data by remembering the execution plan, it does not need to store the actual data. This is precisely what we have seen earlier. Spark desperately tried not to load our data, but this can have disadvantages too.&lt;/p&gt;

&lt;h3&gt;
  
  
  Caching data
&lt;/h3&gt;

&lt;p&gt;Not caching the data radically reduces Spark's memory footprint, but there is a bit of jugglery here. Data does not have to be cached because the plan printed above can be executed again and again and again...&lt;/p&gt;

&lt;p&gt;Now imagine how a mere decently-sized Spark cluster could make our lonely QuestDB instance suffer martyrdom.&lt;/p&gt;

&lt;p&gt;With a massive table containing many partitions, Spark would generate a large number of tasks to be executed parallel across different nodes of the cluster. These tasks would query the table almost simultaneously, putting a heavy load on the database. So, if you find your colleagues cooking breakfast on your database servers, consider forcing Spark to cache some data to reduce the number of trips to the database.&lt;/p&gt;

&lt;p&gt;This can be done by calling &lt;code&gt;df.cache()&lt;/code&gt;. In a large application, it might require a bit of thinking about what is worth caching and how to ensure that Spark executors have enough memory to store the data.&lt;/p&gt;

&lt;p&gt;In practice, you should consider caching smallish datasets used frequently throughout the application's life.&lt;/p&gt;

&lt;p&gt;Let's rerun our code with a tiny modification, adding &lt;code&gt;.cache()&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;

&lt;span class="c1"&gt;# create Spark session
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"questdb_test"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# load 'trades' table into the dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jdbc"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"driver"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"quest"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbtable"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"trades"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;cache&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# print the number of rows
&lt;/span&gt;&lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

&lt;span class="c1"&gt;# print the first 3 rows of the data
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;show&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;truncate&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This time Spark hit the database only twice. First, it came for the schema, the second time for the data:&lt;br&gt;
&lt;code&gt;SELECT "symbol","side","price","amount","timestamp" FROM trades&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;2023-03-21T21:13:04.122390Z I pg-server connected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129]
2023-03-21T21:13:04.147353Z I i.q.c.p.PGConnectionContext parse &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129, &lt;span class="nv"&gt;q&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;SELECT &lt;span class="k"&gt;*&lt;/span&gt; FROM trades WHERE &lt;span class="nv"&gt;1&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0]
2023-03-21T21:13:04.159470Z I pg-server disconnected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129, &lt;span class="nv"&gt;src&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;queue]
2023-03-21T21:13:05.873960Z I pg-server connected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129]
2023-03-21T21:13:05.875951Z I i.q.c.p.PGConnectionContext parse &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129, &lt;span class="nv"&gt;q&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;SELECT &lt;span class="s2"&gt;"symbol"&lt;/span&gt;,&lt;span class="s2"&gt;"side"&lt;/span&gt;,&lt;span class="s2"&gt;"price"&lt;/span&gt;,&lt;span class="s2"&gt;"amount"&lt;/span&gt;,&lt;span class="s2"&gt;"timestamp"&lt;/span&gt; FROM trades     &lt;span class="o"&gt;]&lt;/span&gt;
2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;487309, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;2, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]
2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;457478, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]
2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;2, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]
2023-03-21T21:13:08.479209Z I pg-server disconnected &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;ip&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;127.0.0.1, &lt;span class="nv"&gt;fd&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;129, &lt;span class="nv"&gt;src&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;queue]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Clearly, even a few carefully placed &lt;code&gt;.cache()&lt;/code&gt; calls can improve the overall performance of an application, sometimes significantly. What else should we take into consideration when thinking about performance?&lt;/p&gt;

&lt;p&gt;Earlier, we mentioned that our Spark application consists of tasks, which are working on the different partitions of the data parallel. So, partitioned data mean parallelism, which results in better performance.&lt;/p&gt;

&lt;h3&gt;
  
  
  Spark data partitioning
&lt;/h3&gt;

&lt;p&gt;Now we turn to the Spark UI.&lt;/p&gt;

&lt;p&gt;It tells us that the job was done in a single task:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--htUlCDE8--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yvbaohrfklyhvnripmcz.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--htUlCDE8--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/yvbaohrfklyhvnripmcz.png" alt="Spark UI showing one task completed" width="880" height="432"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The truth is that we have already suspected this. The execution plan told us (&lt;code&gt;numPartitions=1&lt;/code&gt;) and we did not see any parallelism in the QuestDB logs either. We can display more details about this partition with a bit of additional code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;spark_partition_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;min&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;max&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"partitionId"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;spark_partition_id&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitionId&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;agg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;min&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="nb"&gt;max&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitionId&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;show&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;truncate&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;+-----------+--------------------------+--------------------------+------------------+
|partitionId|min&lt;span class="o"&gt;(&lt;/span&gt;timestamp&lt;span class="o"&gt;)&lt;/span&gt;            |max&lt;span class="o"&gt;(&lt;/span&gt;timestamp&lt;span class="o"&gt;)&lt;/span&gt;            |count&lt;span class="o"&gt;(&lt;/span&gt;partitionId&lt;span class="o"&gt;)&lt;/span&gt;|
+-----------+--------------------------+--------------------------+------------------+
|0          |2023-02-01 00:00:00.078073|2023-02-03 23:59:59.801778|1322570           |
+-----------+--------------------------+--------------------------+------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The UI helps us confirm that the data is loaded as a single partition. QuestDB stores this data in 3 partitions. We should try to fix this.&lt;/p&gt;

&lt;p&gt;Although it is not recommended, we can try to use &lt;code&gt;DataFrame.repartition()&lt;/code&gt;. This call reshuffles data across the cluster while partitioning the data, so it should be our last resort. After running &lt;code&gt;df.repartition(3, df.timestamp)&lt;/code&gt;, we see 3 partitions, but not exactly the way we expected. The partitions overlap with one another:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;+-----------+--------------------------+--------------------------+------------------+
|partitionId|min&lt;span class="o"&gt;(&lt;/span&gt;timestamp&lt;span class="o"&gt;)&lt;/span&gt;            |max&lt;span class="o"&gt;(&lt;/span&gt;timestamp&lt;span class="o"&gt;)&lt;/span&gt;            |count&lt;span class="o"&gt;(&lt;/span&gt;partitionId&lt;span class="o"&gt;)&lt;/span&gt;|
+-----------+--------------------------+--------------------------+------------------+
|0          |2023-02-01 00:00:00.698152|2023-02-03 23:59:59.801778|438550            |
|1          |2023-02-01 00:00:00.078073|2023-02-03 23:59:57.188894|440362            |
|2          |2023-02-01 00:00:00.828943|2023-02-03 23:59:59.309075|443658            |
+-----------+--------------------------+--------------------------+------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It seems that &lt;code&gt;DataFrame.repartition()&lt;/code&gt; used hashes to distribute the rows across the 3 partitions. This would mean that all 3 tasks would require data from all 3 QuestDB partitions.&lt;/p&gt;

&lt;p&gt;Let's try this instead: &lt;code&gt;df.repartitionByRange(3, "timestamp")&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;+-----------+--------------------------+--------------------------+------------------+
|partitionId|min&lt;span class="o"&gt;(&lt;/span&gt;timestamp&lt;span class="o"&gt;)&lt;/span&gt;            |max&lt;span class="o"&gt;(&lt;/span&gt;timestamp&lt;span class="o"&gt;)&lt;/span&gt;            |count&lt;span class="o"&gt;(&lt;/span&gt;partitionId&lt;span class="o"&gt;)&lt;/span&gt;|
+-----------+--------------------------+--------------------------+------------------+
|0          |2023-02-01 00:00:00.078073|2023-02-01 21:22:35.656399|429389            |
|1          |2023-02-01 21:22:35.665599|2023-02-02 21:45:02.613339|470372            |
|2          |2023-02-02 21:45:02.641778|2023-02-03 23:59:59.801778|422809            |
+-----------+--------------------------+--------------------------+------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This looks better but still not ideal. That is because &lt;code&gt;DaraFrame.repartitionByRange()&lt;/code&gt; samples the dataset and then estimates the borders of the partitions.&lt;/p&gt;

&lt;p&gt;What we really want is for the DataFrame partitions to match exactly the partitions we see in QuestDB. This way, the tasks running parallel in Spark do not cross their way in QuestDB, likely to result in better performance.&lt;/p&gt;

&lt;p&gt;Data source options are to the rescue! Let's try the following:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;spark_partition_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;min&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;max&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;

&lt;span class="c1"&gt;# create Spark session
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"questdb_test"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# load 'trades' table into the dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jdbc"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"driver"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"quest"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbtable"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"trades"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"partitionColumn"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"numPartitions"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"3"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"lowerBound"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2023-02-01T00:00:00.000000Z"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"upperBound"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2023-02-04T00:00:00.000000Z"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"partitionId"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;spark_partition_id&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitionId&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;agg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;min&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="nb"&gt;max&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitionId&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;show&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;truncate&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;+-----------+--------------------------+--------------------------+------------------+
|partitionId|min&lt;span class="o"&gt;(&lt;/span&gt;timestamp&lt;span class="o"&gt;)&lt;/span&gt;            |max&lt;span class="o"&gt;(&lt;/span&gt;timestamp&lt;span class="o"&gt;)&lt;/span&gt;            |count&lt;span class="o"&gt;(&lt;/span&gt;partitionId&lt;span class="o"&gt;)&lt;/span&gt;|
+-----------+--------------------------+--------------------------+------------------+
|0          |2023-02-01 00:00:00.078073|2023-02-01 23:59:59.664083|487309            |
|1          |2023-02-02 00:00:00.188002|2023-02-02 23:59:59.838473|457478            |
|2          |2023-02-03 00:00:00.565319|2023-02-03 23:59:59.801778|377783            |
+-----------+--------------------------+--------------------------+------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After specifying &lt;code&gt;partitionColumn&lt;/code&gt;, &lt;code&gt;numPartitions&lt;/code&gt;, &lt;code&gt;lowerBound&lt;/code&gt;, and &lt;code&gt;upperBound&lt;/code&gt;, the situation is much better: the row counts in the partitions match what we have seen in the QuestDB logs earlier: &lt;code&gt;rowCount=487309&lt;/code&gt;, &lt;code&gt;rowCount=457478&lt;/code&gt; and &lt;code&gt;rowCount=377783&lt;/code&gt;. Looks like we did it!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;2023-03-21T21:13:05.876615Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-01.2 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;487309, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;2, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]
2023-03-21T21:13:06.259472Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-02.1 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;457478, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]
2023-03-21T21:13:06.653769Z I i.q.c.TableReader open partition /Users/imre/Work/dbroot/db/trades~10/2023-02-03.0 &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;rowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionNameTxn&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;transientRowCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;377783, &lt;span class="nv"&gt;partitionIndex&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;2, &lt;span class="nv"&gt;partitionCount&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3]

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can check Spark UI again; it also confirms that the job was completed in 3 separate tasks, each of them working on a single partition.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--vF1hUYwq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/h8oeb5pfy0ae5dk2gpzi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--vF1hUYwq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/h8oeb5pfy0ae5dk2gpzi.png" alt="Spark UI showing three tasks completed" width="880" height="462"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Sometimes it might be tricky to know the minimum and maximum timestamps when creating the DataFrame. In the worst case, you could query the database for those values via an ordinary connection.&lt;/p&gt;

&lt;p&gt;We have managed to replicate our QuestDB partitions in Spark, but data does not always come from a single table. What if the data required is the result of a query? Can we load that, and is it possible to partition it?&lt;/p&gt;

&lt;h3&gt;
  
  
  Options to load data: SQL query vs table
&lt;/h3&gt;

&lt;p&gt;We can use the &lt;code&gt;query&lt;/code&gt; option to load data from QuestDB with the help of a SQL query:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# 1-minute aggregated trade data&lt;/span&gt;
&lt;span class="nb"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; spark.read.format&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"jdbc"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"url"&lt;/span&gt;, &lt;span class="s2"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"driver"&lt;/span&gt;, &lt;span class="s2"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"user"&lt;/span&gt;, &lt;span class="s2"&gt;"admin"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;.option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"password"&lt;/span&gt;, &lt;span class="s2"&gt;"quest"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"query"&lt;/span&gt;, &lt;span class="s2"&gt;"SELECT symbol, sum(amount) as volume, "&lt;/span&gt;
                     &lt;span class="s2"&gt;"min(price) as minimum, max(price) as maximum, "&lt;/span&gt;
                     &lt;span class="s2"&gt;"round((max(price)+min(price))/2, 2) as mid, "&lt;/span&gt;
                     &lt;span class="s2"&gt;"timestamp as ts "&lt;/span&gt;
                     &lt;span class="s2"&gt;"FROM trades WHERE symbol = 'BTC-USD' "&lt;/span&gt;
                     &lt;span class="s2"&gt;"SAMPLE BY 1m ALIGN to CALENDAR"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .load&lt;span class="o"&gt;()&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Depending on the amount of data and the actual query, you might find that pushing the aggregations to QuestDB is faster than completing it in Spark. Spark definitely has an edge when the dataset is really large.&lt;/p&gt;

&lt;p&gt;Now let's try partitioning this DataFrame with the options used before with the option &lt;code&gt;dbtable&lt;/code&gt;. Unfortunately, we will get an error message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Options &lt;span class="s1"&gt;'query'&lt;/span&gt; and &lt;span class="s1"&gt;'partitionColumn'&lt;/span&gt; can not be specified together.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;However, we can trick Spark by just giving the query an alias name. This means we can go back to using the &lt;code&gt;dbtable&lt;/code&gt; option again, which lets us specify partitioning. See the example below:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;spark_partition_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;min&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;max&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;

&lt;span class="c1"&gt;# create Spark session
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"questdb_test"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# load 1-minute aggregated trade data into the dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jdbc"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"driver"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"quest"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbtable"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"(SELECT symbol, sum(amount) as volume, "&lt;/span&gt;
                     &lt;span class="s"&gt;"min(price) as minimum, max(price) as maximum, "&lt;/span&gt;
                     &lt;span class="s"&gt;"round((max(price)+min(price))/2, 2) as mid, "&lt;/span&gt;
                     &lt;span class="s"&gt;"timestamp as ts "&lt;/span&gt;
                     &lt;span class="s"&gt;"FROM trades WHERE symbol = 'BTC-USD' "&lt;/span&gt;
                     &lt;span class="s"&gt;"SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"partitionColumn"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"ts"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"numPartitions"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"3"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"lowerBound"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2023-02-01T00:00:00.000000Z"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"upperBound"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2023-02-04T00:00:00.000000Z"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"partitionId"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;spark_partition_id&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;groupBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitionId&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;agg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;min&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="nb"&gt;max&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitionId&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;show&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;truncate&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;+-----------+-------------------+-------------------+------------------+
|partitionId|min&lt;span class="o"&gt;(&lt;/span&gt;ts&lt;span class="o"&gt;)&lt;/span&gt;            |max&lt;span class="o"&gt;(&lt;/span&gt;ts&lt;span class="o"&gt;)&lt;/span&gt;            |count&lt;span class="o"&gt;(&lt;/span&gt;partitionId&lt;span class="o"&gt;)&lt;/span&gt;|
+-----------+-------------------+-------------------+------------------+
|0          |2023-02-01 00:00:00|2023-02-01 23:59:00|1440              |
|1          |2023-02-02 00:00:00|2023-02-02 23:59:00|1440              |
|2          |2023-02-03 00:00:00|2023-02-03 23:59:00|1440              |
+-----------+-------------------+-------------------+------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Looking good. Now it seems that we can load any data from QuestDB into Spark by passing a SQL query to the DataFrame. Do we, really?&lt;/p&gt;

&lt;p&gt;Our &lt;code&gt;trades&lt;/code&gt; table is limited to three data types only. What about all the other types you can find in QuestDB?&lt;/p&gt;

&lt;p&gt;We expect that Spark will successfully map a &lt;code&gt;double&lt;/code&gt; or a &lt;code&gt;timestamp&lt;/code&gt; when queried from the database, but what about a &lt;code&gt;geohash&lt;/code&gt;? It is not that obvious what is going to happen.&lt;/p&gt;

&lt;p&gt;As always, when unsure, we should test.&lt;/p&gt;

&lt;h3&gt;
  
  
  Type mappings
&lt;/h3&gt;

&lt;p&gt;I have another table in the database with a different schema. This table has a column for each type currently available in QuestDB.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;all_types&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="n"&gt;SYMBOL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;string&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nb"&gt;char&lt;/span&gt; &lt;span class="nb"&gt;CHAR&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;long&lt;/span&gt; &lt;span class="n"&gt;LONG&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nb"&gt;int&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;short&lt;/span&gt; &lt;span class="n"&gt;SHORT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nb"&gt;byte&lt;/span&gt; &lt;span class="nb"&gt;BYTE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nb"&gt;double&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nb"&gt;float&lt;/span&gt; &lt;span class="nb"&gt;FLOAT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nb"&gt;bool&lt;/span&gt; &lt;span class="nb"&gt;BOOLEAN&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;uuid&lt;/span&gt; &lt;span class="n"&gt;UUID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="c1"&gt;--long128 LONG128,&lt;/span&gt;
  &lt;span class="n"&gt;long256&lt;/span&gt; &lt;span class="n"&gt;LONG256&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;bin&lt;/span&gt; &lt;span class="nb"&gt;BINARY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;g5c&lt;/span&gt; &lt;span class="n"&gt;GEOHASH&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="k"&gt;c&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="nb"&gt;date&lt;/span&gt; &lt;span class="nb"&gt;DATE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="k"&gt;DAY&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;all_types&lt;/span&gt; &lt;span class="k"&gt;values&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'sym1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'str1'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'a'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;456&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;345&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;234&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;123&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;888&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="mi"&gt;11&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'9f9b2131-d49f-4d1d-ab81-39815c50d341'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="c1"&gt;--to_long128(10, 5),&lt;/span&gt;
  &lt;span class="n"&gt;rnd_long256&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;rnd_bin&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;rnd_geohash&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;35&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="n"&gt;to_date&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'2022-02-01'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'yyyy-MM-dd'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="n"&gt;to_timestamp&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'2022-01-15T00:00:03.234'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'yyyy-MM-ddTHH:mm:ss.SSS'&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;long128&lt;/code&gt; is not fully supported by QuestDB yet, so it is commented out.&lt;/p&gt;

&lt;p&gt;Let's try to load and print the data; we can also take a look at the schema of the DataFrame:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;

&lt;span class="c1"&gt;# create Spark session
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"questdb_test"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# create dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jdbc"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"driver"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"quest"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbtable"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"all_types"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# print the schema
&lt;/span&gt;&lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# print the content of the dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;show&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;truncate&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Much to my surprise, Spark managed to create the DataFrame and mapped all types.&lt;/p&gt;

&lt;p&gt;Here is the schema:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;StructType&lt;span class="o"&gt;([&lt;/span&gt;
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'symbol'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'string'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'char'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'long'&lt;/span&gt;, LongType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'int'&lt;/span&gt;, IntegerType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'short'&lt;/span&gt;, ShortType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'byte'&lt;/span&gt;, ShortType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'double'&lt;/span&gt;, DoubleType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'float'&lt;/span&gt;, FloatType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'bool'&lt;/span&gt;, BooleanType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'uuid'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
&lt;span class="c"&gt;#   StructField('long128', StringType(), True),&lt;/span&gt;
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'long256'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'bin'&lt;/span&gt;, BinaryType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'g5c'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'date'&lt;/span&gt;, TimestampType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'timestamp'&lt;/span&gt;, TimestampType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;])&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It looks pretty good but you might wonder if it is a good idea to map &lt;code&gt;long256&lt;/code&gt; and &lt;code&gt;geohash&lt;/code&gt; types to &lt;code&gt;String&lt;/code&gt;. QuestDB does not provide arithmetics for these types, so it is not a big deal.&lt;/p&gt;

&lt;p&gt;Geohashes are basically 32-base numbers, represented and stored in their string format. The 256-bit long values are also treated as string literals. &lt;code&gt;long256&lt;/code&gt; is used to store cryptocurrency private keys.&lt;/p&gt;

&lt;p&gt;Now let's see the data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;+------+------+----+----+---+-----+----+------+-----+----+------------------------------------+
|symbol|string|char|long|int|short|byte|double|float|bool|uuid                                |
+------+------+----+----+---+-----+----+------+-----+----+------------------------------------+
|sym1  |str1  |a   |456 |345|234  |123 |888.8 |11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341|
+------+------+----+----+---+-----+----+------+-----+----+------------------------------------+

+------------------------------------------------------------------+----------------------------------------------------+
|long256                                                           |bin                                                 |
+------------------------------------------------------------------+----------------------------------------------------+
|0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]|
+------------------------------------------------------------------+----------------------------------------------------+

+-----+-------------------+-----------------------+
|g5c  |date               |timestamp              |
+-----+-------------------+-----------------------+
|q661k|2022-02-01 00:00:00|2022-01-15 00:00:03.234|
+-----+-------------------+-----------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It also looks good, but we could omit the &lt;code&gt;00:00:00&lt;/code&gt; from the end of the date field. We can see that it is mapped to &lt;code&gt;Timestamp&lt;/code&gt; and not &lt;code&gt;Date&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;We could also try to map one of the numeric fields to &lt;code&gt;Decimal&lt;/code&gt;. This can be useful if later we want to do computations that require high precision.&lt;/p&gt;

&lt;p&gt;We can use the &lt;code&gt;customSchema&lt;/code&gt; option to customize the column types. Our modified code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;

&lt;span class="c1"&gt;# create Spark session
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"questdb_test"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# create dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jdbc"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"driver"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"quest"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbtable"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"all_types"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"customSchema"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"date DATE, double DECIMAL(38, 10)"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# print the schema
&lt;/span&gt;&lt;span class="k"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# print the content of the dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;show&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;truncate&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The new schema:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;StructType&lt;span class="o"&gt;([&lt;/span&gt;
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'symbol'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'string'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'char'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'long'&lt;/span&gt;, LongType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'int'&lt;/span&gt;, IntegerType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'short'&lt;/span&gt;, ShortType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'byte'&lt;/span&gt;, ShortType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'double'&lt;/span&gt;, DecimalType&lt;span class="o"&gt;(&lt;/span&gt;38,10&lt;span class="o"&gt;)&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'float'&lt;/span&gt;, FloatType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'bool'&lt;/span&gt;, BooleanType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'uuid'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
&lt;span class="c"&gt;#   StructField('long128', StringType(), True),&lt;/span&gt;
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'long256'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'bin'&lt;/span&gt;, BinaryType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'g5c'&lt;/span&gt;, StringType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'date'&lt;/span&gt;, DateType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;,
  StructField&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'timestamp'&lt;/span&gt;, TimestampType&lt;span class="o"&gt;()&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;])&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And the data is displayed as:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;+------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+
|symbol|string|char|long|int|short|byte|double        |float|bool|uuid                                |
+------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+
|sym1  |str1  |a   |456 |345|234  |123 |888.8000000000|11.1 |true|9f9b2131-d49f-4d1d-ab81-39815c50d341|
+------+------+----+----+---+-----+----+--------------+-----+----+------------------------------------+

+------------------------------------------------------------------+----------------------------------------------------+
|long256                                                           |bin                                                 |
+------------------------------------------------------------------+----------------------------------------------------+
|0x8ee3c2a42acced0bb0bdb411c82bb01e7e487689228c189d1e865fa0e025973c|[F2 4D 4B F6 18 C2 9A A7 87 C2 D3 19 4A 2C 4B 92 C4]|
+------------------------------------------------------------------+----------------------------------------------------+

+-----+----------+-----------------------+
|g5c  |date      |timestamp              |
+-----+----------+-----------------------+
|q661k|2022-02-01|2022-01-15 00:00:03.234|
+-----+----------+-----------------------+

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It seems that Spark can handle almost all database types. The only issue is &lt;code&gt;long128&lt;/code&gt;, but this type is a work in progress currently in QuestDB. When completed, it will be mapped as &lt;code&gt;String&lt;/code&gt;, just like &lt;code&gt;long256&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Writing data back into the database
&lt;/h2&gt;

&lt;p&gt;There is only one thing left: writing data back into QuestDB.&lt;/p&gt;

&lt;p&gt;In this example, first, we will load some data from the database and add two new features:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;10-minute moving average&lt;/li&gt;
&lt;li&gt;standard deviation, also calculated over the last 10-minute window&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Then we will try to save the modified DataFrame back into QuestDB as a new table. We need to take care of some type mappings as &lt;code&gt;Double&lt;/code&gt; columns are sent as &lt;code&gt;FLOAT8&lt;/code&gt; to QuestDB by default, so we end up with this code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.window&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Window&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="nn"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;avg&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;stddev&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;when&lt;/span&gt;

&lt;span class="c1"&gt;# create Spark session
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"questdb_test"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# load 1-minute aggregated trade data into the dataframe
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jdbc"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"driver"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"quest"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbtable"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"(SELECT symbol, sum(amount) as volume, "&lt;/span&gt;
                       &lt;span class="s"&gt;"round((max(price)+min(price))/2, 2) as mid, "&lt;/span&gt;
                       &lt;span class="s"&gt;"timestamp as ts "&lt;/span&gt;
                       &lt;span class="s"&gt;"FROM trades WHERE symbol = 'BTC-USD' "&lt;/span&gt;
                       &lt;span class="s"&gt;"SAMPLE BY 1m ALIGN to CALENDAR) AS fake_table"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"partitionColumn"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"ts"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"numPartitions"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"3"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"lowerBound"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2023-02-01T00:00:00.000000Z"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"upperBound"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"2023-02-04T00:00:00.000000Z"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# add new features
&lt;/span&gt;&lt;span class="n"&gt;window_10&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Window&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;partitionBy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;rowsBetween&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Window&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;currentRow&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"ma10"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;avg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mid&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;over&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;window_10&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"std"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;stddev&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mid&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;over&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;window_10&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;withColumn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"std"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;when&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;std&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;isNull&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="mf"&gt;0.0&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;otherwise&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;std&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;span class="c1"&gt;# save the data as 'trades_enriched'
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"jdbc"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"url"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"driver"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"user"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"admin"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"password"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"quest"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbtable"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"trades_enriched"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"createTableColumnTypes"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;save&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;All works but… we soon realize that our new table, &lt;code&gt;trades_enriched&lt;/code&gt; is not partitioned and does not have a designated timestamp, which is not ideal. Obviously Spark has no idea of QuestDB specifics.&lt;/p&gt;

&lt;p&gt;It would work better if we created the table upfront and Spark only saved the data into it. We drop the table and re-create it; this time, it is partitioned and has a designated timestamp:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;DROP&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;trades_enriched&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;trades_enriched&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;volume&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;mid&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;ts&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;ma10&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;std&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="k"&gt;DAY&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The table is empty and waiting for the data.&lt;/p&gt;

&lt;p&gt;We rerun the code; all works, no complaints. The data is in the table, and it is partitioned.&lt;/p&gt;

&lt;p&gt;One aspect of writing data into the database is if we are allowed to create duplicates. What if I try to rerun the code again without dropping the table? Will Spark let me save the data this time? &lt;br&gt;
No, we get an error:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pyspark.sql.utils.AnalysisException: Table or view &lt;span class="s1"&gt;'trades_enriched'&lt;/span&gt; already exists. SaveMode: ErrorIfExists.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The last part of the error message looks interesting: &lt;code&gt;SaveMode: ErrorIfExists&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;What is &lt;code&gt;SaveMode&lt;/code&gt;? It turns out we can configure what should happen if the table already exists. Our options are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;errorifexists&lt;/code&gt;: the default behavior is to return an error if the table
already exists, Spark is playing safe here&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;append&lt;/code&gt; : data will be appended to the existing rows already present in the
table&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;overwrite&lt;/code&gt;: the content of the table will be replaced entirely by the newly
saved data&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;ignore&lt;/code&gt;: if the table is not empty, our save operation gets ignored without
any error&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We have already seen how &lt;code&gt;errorifexists&lt;/code&gt; behaves, &lt;code&gt;append&lt;/code&gt; and &lt;code&gt;ignore&lt;/code&gt; seem to be simple enough just to work.&lt;/p&gt;

&lt;p&gt;However, &lt;code&gt;overwrite&lt;/code&gt; is not straightforward. The content of the table must be cleared before the new data can be saved. Spark will delete and re-create the table by default, which means losing partitioning and the designated timestamp.&lt;/p&gt;

&lt;p&gt;In general, we do not want Spark to create tables for us. Luckily, with the &lt;code&gt;truncate&lt;/code&gt; option we can tell Spark to use &lt;code&gt;TRUNCATE&lt;/code&gt; to clear the table instead of deleting it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# save the data as 'trades_enriched', overwrite if already exists&lt;/span&gt;
df.write.format&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"jdbc"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"url"&lt;/span&gt;, &lt;span class="s2"&gt;"jdbc:postgresql://localhost:8812/questdb"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"driver"&lt;/span&gt;, &lt;span class="s2"&gt;"org.postgresql.Driver"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"user"&lt;/span&gt;, &lt;span class="s2"&gt;"admin"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;.option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"password"&lt;/span&gt;, &lt;span class="s2"&gt;"quest"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"dbtable"&lt;/span&gt;, &lt;span class="s2"&gt;"trades_enriched"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"truncate"&lt;/span&gt;, True&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .option&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"createTableColumnTypes"&lt;/span&gt;, &lt;span class="s2"&gt;"volume DOUBLE, mid DOUBLE, ma10 DOUBLE, std DOUBLE"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    .save&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;mode&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"overwrite"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above works as expected.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Our ride might seem bumpy, but we finally have everything&lt;br&gt;
working. Our new motto should be "There is a config option for everything!".&lt;/p&gt;

&lt;p&gt;To summarize what we have found:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;We can use Spark's JDBC data source to integrate with QuestDB.&lt;/li&gt;
&lt;li&gt;It is recommended to use the &lt;code&gt;dbtable&lt;/code&gt; option, even if we use a SQL query to load data.&lt;/li&gt;
&lt;li&gt;Always try to specify partitioning options (&lt;code&gt;partitionColumn&lt;/code&gt;, &lt;code&gt;numPartitions&lt;/code&gt;, &lt;code&gt;lowerBound&lt;/code&gt; and &lt;code&gt;upperBound&lt;/code&gt;) when loading data, partitions ideally should match with the QuestDB partitions.&lt;/li&gt;
&lt;li&gt;Sometimes it makes sense to cache some data in Spark to reduce the number of trips to the database.&lt;/li&gt;
&lt;li&gt;It can be beneficial to push work down into the database, depending on the task and how much data is involved. It makes sense to make use of QuestDB's time-series-specific features, such as &lt;code&gt;SAMPLE BY&lt;/code&gt;, instead of trying to rewrite it in Spark.&lt;/li&gt;
&lt;li&gt;Type mappings can be customized via the &lt;code&gt;customSchema&lt;/code&gt; option when loading data.&lt;/li&gt;
&lt;li&gt;When writing data into QuestDB always specify the desired saving mode.&lt;/li&gt;
&lt;li&gt;Generally works better if you create the table upfront and do not let Spark create it, because this way you can add partitioning and designated timestamp.&lt;/li&gt;
&lt;li&gt;If selected the &lt;code&gt;overwrite&lt;/code&gt; saving mode, you should enable the &lt;code&gt;truncate&lt;/code&gt; option too to make sure Spark does not delete the table; hence partitioning and the designated timestamp will not get lost.&lt;/li&gt;
&lt;li&gt;Type mappings can be customized via the &lt;code&gt;createTableColumnTypes&lt;/code&gt; option when saving data.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I mentioned only the config options which are most likely to be tweaked when working with QuestDB; the complete set of options can be found here:&lt;br&gt;
&lt;a href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option"&gt;Spark data source options&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  What could the future bring?
&lt;/h2&gt;

&lt;p&gt;Overall everything works, but it would be nice to have a much more seamless way of integration, where partitioning would be taken care of automagically. Some type mappings could use better defaults, too, when saving data into QuestDB. The&lt;br&gt;
&lt;code&gt;overwrite&lt;/code&gt; saving mode could default to use &lt;code&gt;TRUNCATE&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;More seamless integration is not impossible to achieve. If QuestDB provided its own &lt;code&gt;JDBCDialect&lt;/code&gt; implementation for Spark, the above nuances could be handled.&lt;br&gt;
We should probably consider adding this.&lt;/p&gt;

&lt;p&gt;Finally, there is one more thing we did not mention yet, data locality. That is because, currently QuestDB cannot run as a cluster. However, we are actively working on a solution - check out &lt;a href="https://questdb.io/blog/inner-workings-distributed-databases/"&gt;The Inner Workings of Distributed Databases&lt;/a&gt;&lt;br&gt;
for more information.&lt;/p&gt;

&lt;p&gt;When the time comes, we should ensure that data locality is also considered. Ideally, each Spark node would work on tasks that require partitions loaded from the local (or closest) QuestDB instance.&lt;/p&gt;

&lt;p&gt;However, this is not something we should be concerned about at this moment... for now, just enjoy data crunching!&lt;/p&gt;

</description>
      <category>tutorial</category>
      <category>spark</category>
      <category>questdb</category>
      <category>database</category>
    </item>
  </channel>
</rss>
