Live Notebook

You can run this notebook in a live session Binder or view it on Github.

Operating on Dask Dataframes with SQL

Dask-SQL is an open source project and Python package leveraging Apache Calcite to provide a SQL frontend for Dask dataframe operations, allowing SQL users to take advantage of Dask’s distributed capabilities without requiring an extensive knowledge of the dataframe API.

[1]:
! pip install dask-sql
Collecting dask-sql
  Downloading dask_sql-2022.6.0-py3-none-any.whl (21.1 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 21.1/21.1 MB 71.0 MB/s eta 0:00:00
Collecting uvicorn>=0.11.3
  Downloading uvicorn-0.18.2-py3-none-any.whl (57 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 57.0/57.0 KB 12.7 MB/s eta 0:00:00
Requirement already satisfied: tabulate in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (0.8.9)
Requirement already satisfied: nest-asyncio in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (1.5.5)
Collecting tzlocal>=2.1
  Downloading tzlocal-4.2-py3-none-any.whl (19 kB)
Requirement already satisfied: pandas>=1.0.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (1.4.2)
Requirement already satisfied: pygments in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (2.12.0)
Requirement already satisfied: prompt-toolkit in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (3.0.29)
Collecting jpype1>=1.0.2
  Downloading JPype1-1.4.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl (453 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 453.8/453.8 KB 69.3 MB/s eta 0:00:00
Requirement already satisfied: dask[dataframe,distributed]<=2022.5.2,>=2022.3.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (2022.5.0)
Collecting fastapi>=0.61.1
  Downloading fastapi-0.79.0-py3-none-any.whl (54 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 54.6/54.6 KB 13.5 MB/s eta 0:00:00
Requirement already satisfied: packaging>=20.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (21.3)
Requirement already satisfied: fsspec>=0.6.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2022.3.0)
Requirement already satisfied: toolz>=0.8.2 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (0.11.2)
Requirement already satisfied: cloudpickle>=1.1.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2.0.0)
Requirement already satisfied: pyyaml>=5.3.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (6.0)
Requirement already satisfied: partd>=0.3.10 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.2.0)
Requirement already satisfied: numpy>=1.18 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.22.3)
Requirement already satisfied: distributed==2022.05.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2022.5.0)
Requirement already satisfied: jinja2 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (3.1.1)
Requirement already satisfied: urllib3 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.26.9)
Requirement already satisfied: tblib>=1.6.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.7.0)
Requirement already satisfied: tornado>=6.0.3 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (6.1)
Requirement already satisfied: sortedcontainers!=2.0.0,!=2.0.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2.4.0)
Requirement already satisfied: msgpack>=0.6.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.0.3)
Requirement already satisfied: locket>=1.0.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.0.0)
Requirement already satisfied: zict>=0.1.3 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2.2.0)
Requirement already satisfied: click>=6.6 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (8.1.3)
Requirement already satisfied: psutil>=5.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (5.9.0)
Requirement already satisfied: pydantic!=1.7,!=1.7.1,!=1.7.2,!=1.7.3,!=1.8,!=1.8.1,<2.0.0,>=1.6.2 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from fastapi>=0.61.1->dask-sql) (1.9.1)
Collecting starlette==0.19.1
  Downloading starlette-0.19.1-py3-none-any.whl (63 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 63.3/63.3 KB 16.2 MB/s eta 0:00:00
Requirement already satisfied: anyio<5,>=3.4.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from starlette==0.19.1->fastapi>=0.61.1->dask-sql) (3.5.0)
Requirement already satisfied: typing-extensions>=3.10.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from starlette==0.19.1->fastapi>=0.61.1->dask-sql) (4.2.0)
Requirement already satisfied: python-dateutil>=2.8.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from pandas>=1.0.0->dask-sql) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from pandas>=1.0.0->dask-sql) (2022.1)
Collecting pytz-deprecation-shim
  Downloading pytz_deprecation_shim-0.1.0.post0-py2.py3-none-any.whl (15 kB)
Collecting h11>=0.8
  Downloading h11-0.13.0-py3-none-any.whl (58 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 58.2/58.2 KB 18.7 MB/s eta 0:00:00
Requirement already satisfied: wcwidth in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from prompt-toolkit->dask-sql) (0.2.5)
Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from packaging>=20.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (3.0.8)
Requirement already satisfied: six>=1.5 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from python-dateutil>=2.8.1->pandas>=1.0.0->dask-sql) (1.16.0)
Collecting tzdata
  Downloading tzdata-2022.1-py2.py3-none-any.whl (339 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 339.5/339.5 KB 61.7 MB/s eta 0:00:00
Requirement already satisfied: sniffio>=1.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from anyio<5,>=3.4.0->starlette==0.19.1->fastapi>=0.61.1->dask-sql) (1.2.0)
Requirement already satisfied: idna>=2.8 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from anyio<5,>=3.4.0->starlette==0.19.1->fastapi>=0.61.1->dask-sql) (3.3)
Requirement already satisfied: heapdict in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from zict>=0.1.3->distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.0.1)
Requirement already satisfied: MarkupSafe>=2.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from jinja2->distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2.1.1)
Installing collected packages: tzdata, jpype1, h11, uvicorn, starlette, pytz-deprecation-shim, tzlocal, fastapi, dask-sql
Successfully installed dask-sql-2022.6.0 fastapi-0.79.0 h11-0.13.0 jpype1-1.4.0 pytz-deprecation-shim-0.1.0.post0 starlette-0.19.1 tzdata-2022.1 tzlocal-4.2 uvicorn-0.18.2

Set up a Dask cluster

Setting up a Dask Cluster is optional, but can dramatically expand our options for distributed computation by giving us access to Dask workers on GPUs, remote machines, common cloud providers, and more). Additionally, connecting our cluster to a Dask Client will give us access to a dashboard, which can be used to monitor the progress of active computations and diagnose issues.

For this notebook, we will create a local cluster and connect it to a client. Once the client has been created, a link will appear to its associated dashboard, which can be viewed throughout the following computations.

[2]:
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client
[2]:

Client

Client-2c014484-0de0-11ed-9c67-000d3a8f7959

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Create a context

A dask_sql.Context is the Python equivalent to a SQL database, serving as an interface to register all tables and functions used in SQL queries, as well as execute the queries themselves. In typical usage, a single Context is created and used for the duration of a Python script or notebook.

[3]:
from dask_sql import Context
c = Context()
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask_sql/java.py:39: UserWarning: You are running in a conda environment, but the JAVA_PATH is not using it. If this is by mistake, set $JAVA_HOME to /usr/share/miniconda3/envs/dask-examples, instead of /usr/lib/jvm/temurin-11-jdk-amd64.
  warnings.warn(

Load and register data

Once a Context has been created, there are a variety of ways to register tables in it. The simplest way to do this is through the create_table method, which accepts a variety of input types which Dask-SQL then uses to infer the table creation method. Supported input types include:

Input type can also be specified explicitly by providing a format. When being registered, tables can optionally be persisted into memory by passing persist=True, which can greatly speed up repeated queries on the same table at the cost of loading the entire table into memory. For more information, see Data Loading and Input.

[4]:
import pandas as pd
from dask.datasets import timeseries

# register and persist a dask table
ddf = timeseries()
c.create_table("dask", ddf, persist=True)

# register a pandas table (implicitly converted to a dask table)
df = pd.DataFrame({"a": [1, 2, 3]})
c.create_table("pandas", df)

# register a table from local storage; kwargs are passed on to the underlying table creation method
c.create_table(
    "local",
    "surveys/data/2021-user-survey-results.csv.gz",
    format="csv",
    parse_dates=['Timestamp'],
    blocksize=None
)

Tables can also be registered through SQL CREATE TABLE WITH or CREATE TABLE AS statements, using the sql method.

[5]:
# replace our table from local storage
c.sql("""
    CREATE OR REPLACE TABLE
        "local"
    WITH (
        location = 'surveys/data/2021-user-survey-results.csv.gz',
        format = 'csv',
        parse_dates = ARRAY [ 'Timestamp' ]
    )
""")

# create a new table from a SQL query
c.sql("""
    CREATE TABLE filtered AS (
        SELECT id, name FROM dask WHERE name = 'Zelda'
    )
""")
/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/io/csv.py:533: UserWarning: Warning gzip compression does not support breaking apart files
Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(

All registered tables can be listed with a SHOW TABLES statement.

[6]:
c.sql("SHOW TABLES FROM root").compute()
[6]:
Table
0 dask
1 pandas
2 local
3 filtered

Dask-SQL currently offers experimental GPU support, powered by the RAPIDS suite of open source GPU data science libraries. Input support is currently limited to Dask / Pandas-like dataframes and data in local/remote storage, and though most queries run without issue, users should expect some bugs or undefined behavior. To register a table and mark it for use on GPUs, gpu=True can be passed to a standard create_table call, or its equivalent CREATE TABLE WITH query (note that this requires cuDF and Dask-cuDF).

# register a dask table for use on GPUs (not possible in this binder)
c.create_table("gpu_dask", ddf, gpu=True)

# load in a table from disk using GPU-accelerated IO operations
c.sql("""
    CREATE TABLE
        "gpu_local"
    WITH (
        location = 'surveys/data/2021-user-survey-results.csv.gz',
        format = 'csv',
        parse_dates = ARRAY [ 'Timestamp' ],
        gpu = True
    )
""")

Query the data

When the sql method is called, Dask-SQL hands the query off to Apache Calcite to convert into a relational algebra - essentially a list of SQL tasks that must be executed in order to get a result. The relational algebra of any query can be viewed directly using the explain method.

[7]:
print(c.explain("SELECT AVG(x) FROM dask"))
DaskProject(EXPR$0=[/(CAST(CASE(=($1, 0), null:DOUBLE, $0)):DECIMAL(19, 15), $1)]): rowcount = 10.0, cumulative cost = {122.5 rows, 111.0 cpu, 0.0 io}, id = 83
  DaskAggregate(group=[{}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)]): rowcount = 10.0, cumulative cost = {112.5 rows, 101.0 cpu, 0.0 io}, id = 82
    DaskTableScan(table=[[root, dask]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 77

From here, this relational algebra is then converted into a Dask computation graph, which ultimately returns (or in the case of CREATE TABLE statements, implicitly assigns) a Dask dataframe.

[8]:
c.sql("SELECT AVG(x) FROM dask")
[8]:
Dask DataFrame Structure:
AVG("dask"."x")
npartitions=1
float64
...
Dask Name: rename, 107 tasks

Dask dataframes are lazy, meaning that at the time of their creation, none of their dependent tasks have been executed yet. To actually execute these tasks and get a result, we must call compute.

[9]:
c.sql("SELECT AVG(x) FROM dask").compute()
[9]:
AVG("dask"."x")
0 -0.000302

Looking at the dashboard, we can see that executing this query has triggered some Dask computations.

Because the return value of a query is a Dask dataframe, it is also possible to do follow-up operations on it using Dask’s dataframe API. This can be useful if we want to perform some complex operations on a dataframe that are not possible through Dask, then follow up with some simpler operations that can easily be expressed through the dataframe API.

[10]:
# perform a multi-column sort that isn't possible in Dask
res = c.sql("""
    SELECT * FROM dask ORDER BY name ASC, id DESC, x ASC
""")

# now do some follow groupby aggregations
res.groupby("name").agg({"x": "sum", "y": "mean"}).compute()
[10]:
x y
name
Alice -249.383593 0.001241
Bob 160.839932 0.000056
Charlie -77.458027 -0.001389
Dan 141.385152 -0.001548
Edith -33.965445 -0.000867
Frank 31.380364 -0.000966
George 291.711276 -0.002320
Hannah 76.193943 -0.001283
Ingrid 69.657261 -0.001849
Jerry -35.406853 -0.002052
Kevin -199.853191 0.000221
Laura 98.363175 -0.001911
Michael -100.410534 0.004294
Norbert 189.525214 -0.000738
Oliver -251.094045 -0.000164
Patricia -37.815014 0.003536
Quinn -137.963034 -0.001342
Ray -274.337917 0.004108
Sarah -237.457164 0.001387
Tim 67.416750 0.001667
Ursula -188.578720 0.002330
Victor -60.309784 -0.000196
Wendy 128.743367 0.000112
Xavier -158.350232 -0.001734
Yvonne 43.986670 0.001555
Zelda -38.438229 0.001045

Custom functions and aggregations

When standard SQL functionality is insufficient, it is possible to register custom functions for use in queries. These functions can be classified as one of the following:

  • Column-wise functions

  • Row-wise functions

  • Aggregations

Column-wise functions

Column-wise functions can take columns or literal values as input and return a column of an identical length. Column-wise functions can be registered in a Context using the register_function method.

[11]:
import numpy as np

def f(x):
    return x ** 2

c.register_function(f, "f", [("x", np.float64)], np.float64)

Function registration requires the following inputs:

  • A callable function

  • A name for the function to be referred to in queries

  • A list of tuples, representing the input variables and their respective types, which can be either Pandas or NumPy types

  • A type for the output column

Once a function has been registered, it can be called like any other standard SQL function.

[12]:
c.sql("SELECT F(x) FROM dask").compute()
[12]:
"F"("dask"."x")
timestamp
2000-01-01 00:00:00 0.408645
2000-01-01 00:00:01 0.497901
2000-01-01 00:00:02 0.064370
2000-01-01 00:00:03 0.421497
2000-01-01 00:00:04 0.304109
... ...
2000-01-30 23:59:55 0.691240
2000-01-30 23:59:56 0.499867
2000-01-30 23:59:57 0.049903
2000-01-30 23:59:58 0.004089
2000-01-30 23:59:59 0.490209

2592000 rows × 1 columns

Row-wise functions

In some cases, it may be easier to write a custom function that processes a dict-like row object - otherwise known as a row-wise function. These functions can also be registered using register_function by passing row_udf=True, and used in the same manner as a column-wise function.

[13]:
def g(row):
    if row["x"] > row["y"]:
        return row["x"] - row["y"]
    return row["y"] - row["x"]

c.register_function(g, "g", [("x", np.float64), ("y", np.float64)], np.float64, row_udf=True)

c.sql("SELECT G(x, y) FROM dask").compute()
[13]:
"G"("dask"."x", "dask"."y")
timestamp
2000-01-01 00:00:00 0.446911
2000-01-01 00:00:01 0.900878
2000-01-01 00:00:02 0.052787
2000-01-01 00:00:03 0.454549
2000-01-01 00:00:04 1.157125
... ...
2000-01-30 23:59:55 1.603634
2000-01-30 23:59:56 1.389727
2000-01-30 23:59:57 0.671131
2000-01-30 23:59:58 0.773367
2000-01-30 23:59:59 0.023842

2592000 rows × 1 columns

Note that unlike column-wise functions, which are called directly using specified columns and literals as input, row-wise functions are called using apply, which can have unpredictable performance depending on the underlying dataframe library (e.g. Pandas, cuDF) and the function itself.

Aggregations

Aggregations take a single column as input and return a single value - thus, they can only be used to reduce the results of a GROUP BY query. Aggregations can be registered using the register_aggregation method, which is functionally similar to register_function but takes a Dask Aggregation as input instead of a callable function.

[14]:
import dask.dataframe as dd

my_sum = dd.Aggregation("my_sum", lambda x: x.sum(), lambda x: x.sum())

c.register_aggregation(my_sum, "my_sum", [("x", np.float64)], np.float64)

c.sql("SELECT MY_SUM(x) FROM dask").compute()
[14]:
"MY_SUM"("dask"."x")
0 -781.618678

Machine learning in SQL

Dask-SQL has support for both model training and prediction, enabling machine learning workflows with a flexible combination of both Python and SQL. A model can be registered in a Context either through the register_model method or a CREATE MODEL statement.

[15]:
from dask_ml.linear_model import LinearRegression
from sklearn.ensemble import GradientBoostingClassifier

# create a dask-ml model and train it
model = GradientBoostingClassifier()
data = c.sql("SELECT x, y, x * y > 0 AS target FROM dask LIMIT 50")
model.fit(data[["x", "y"]], data["target"])

# register this model in the context
c.register_model("python_model", model, training_columns=["x", "y"])

# create and train a model directly from SQL
c.sql("""
    CREATE MODEL sql_model WITH (
        model_class = 'sklearn.ensemble.GradientBoostingClassifier',
        wrap_predict = True,
        target_column = 'target'
    ) AS (
        SELECT x, y, x * y > 0 AS target
        FROM dask
        LIMIT 50
    )
""")

Registered models must follow the scikit-learn interface by implementing a predict method. As with tables, all registered models can be listed with a SHOW MODEL statement.

[16]:
c.sql("SHOW MODELS").compute()
[16]:
Models
0 python_model
1 sql_model

From here, the models can be used to make predictions using the PREDICT keyword as part of a SELECT query.

[17]:
c.sql("""
    SELECT * FROM PREDICT (
        MODEL sql_model,
        SELECT x, y, x * y > 0 AS actual FROM dask
        OFFSET 50
    )
""").compute()
[17]:
x y actual target
timestamp
2000-01-01 00:00:50 -0.508541 -0.018462 True True
2000-01-01 00:00:51 0.652920 -0.847008 False False
2000-01-01 00:00:52 -0.779734 0.117797 False False
2000-01-01 00:00:53 0.360605 -0.965205 False False
2000-01-01 00:00:54 -0.475373 0.652320 False False
... ... ... ... ...
2000-01-30 23:59:55 -0.831409 0.772225 False False
2000-01-30 23:59:56 -0.707013 0.682714 False False
2000-01-30 23:59:57 -0.223391 0.447740 False False
2000-01-30 23:59:58 0.063943 -0.709424 False False
2000-01-30 23:59:59 -0.700149 -0.723991 True True

2591950 rows × 4 columns

[ ]: