{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Operating on Dask Dataframes with SQL\n", "\n", "[Dask-SQL](https://dask-sql.readthedocs.io/en/stable/) is an open source project and Python package leveraging [Apache Calcite](https://calcite.apache.org/) to provide a SQL frontend for [Dask](https://dask.org/) dataframe operations, allowing SQL users to take advantage of Dask's distributed capabilities without requiring an extensive knowledge of the dataframe API." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:39:57.953904Z", "iopub.status.busy": "2022-06-17T02:39:57.953455Z", "iopub.status.idle": "2022-06-17T02:40:04.457394Z", "shell.execute_reply": "2022-06-17T02:40:04.456610Z" }, "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Collecting dask-sql\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " Downloading dask_sql-2022.6.0-py3-none-any.whl (21.1 MB)\r\n", "\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/21.1 MB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\r", "\u001b[2K \u001b[91m━━━━━━\u001b[0m\u001b[91m╸\u001b[0m\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m3.5/21.1 MB\u001b[0m \u001b[31m107.1 MB/s\u001b[0m eta \u001b[36m0:00:01\u001b[0m" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\r", "\u001b[2K \u001b[91m━━━━━━━━━━━━━━\u001b[0m\u001b[90m╺\u001b[0m\u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m7.6/21.1 MB\u001b[0m \u001b[31m110.9 MB/s\u001b[0m eta \u001b[36m0:00:01\u001b[0m\r", "\u001b[2K \u001b[91m━━━━━━━━━━━━━━━━━━━━━\u001b[0m\u001b[91m╸\u001b[0m\u001b[90m━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m11.7/21.1 MB\u001b[0m \u001b[31m115.3 MB/s\u001b[0m eta \u001b[36m0:00:01\u001b[0m" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\r", "\u001b[2K \u001b[91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m\u001b[90m╺\u001b[0m\u001b[90m━━━━━━━━━\u001b[0m \u001b[32m15.9/21.1 MB\u001b[0m \u001b[31m116.8 MB/s\u001b[0m eta \u001b[36m0:00:01\u001b[0m" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\r", "\u001b[2K \u001b[91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m\u001b[91m╸\u001b[0m\u001b[90m━\u001b[0m \u001b[32m20.3/21.1 MB\u001b[0m \u001b[31m117.2 MB/s\u001b[0m eta \u001b[36m0:00:01\u001b[0m\r", "\u001b[2K \u001b[91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m\u001b[91m╸\u001b[0m \u001b[32m21.1/21.1 MB\u001b[0m \u001b[31m115.9 MB/s\u001b[0m eta \u001b[36m0:00:01\u001b[0m" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\r", "\u001b[2K \u001b[91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m\u001b[91m╸\u001b[0m \u001b[32m21.1/21.1 MB\u001b[0m \u001b[31m115.9 MB/s\u001b[0m eta \u001b[36m0:00:01\u001b[0m\r", "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m21.1/21.1 MB\u001b[0m \u001b[31m59.9 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\r\n", "\u001b[?25h" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Collecting uvicorn>=0.11.3\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " Downloading uvicorn-0.17.6-py3-none-any.whl (53 kB)\r\n", "\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/53.6 KB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r", "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m53.6/53.6 KB\u001b[0m \u001b[31m14.5 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\r\n", "\u001b[?25hRequirement already satisfied: prompt-toolkit in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (3.0.29)\r\n", "Collecting tzlocal>=2.1\r\n", " Downloading tzlocal-4.2-py3-none-any.whl (19 kB)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Collecting fastapi>=0.61.1\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " Downloading fastapi-0.78.0-py3-none-any.whl (54 kB)\r\n", "\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/54.6 KB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r", "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m54.6/54.6 KB\u001b[0m \u001b[31m15.7 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\r\n", "\u001b[?25hRequirement already satisfied: pandas>=1.0.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (1.4.2)\r\n", "Requirement already satisfied: dask[dataframe,distributed]<=2022.5.2,>=2022.3.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (2022.5.0)\r\n", "Requirement already satisfied: tabulate in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (0.8.9)\r\n", "Requirement already satisfied: nest-asyncio in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (1.5.5)\r\n", "Requirement already satisfied: pygments in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask-sql) (2.12.0)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Collecting jpype1>=1.0.2\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " Downloading JPype1-1.4.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl (453 kB)\r\n", "\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/453.8 KB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r", "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m453.8/453.8 KB\u001b[0m \u001b[31m59.5 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\r\n", "\u001b[?25h" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: fsspec>=0.6.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2022.3.0)\r\n", "Requirement already satisfied: pyyaml>=5.3.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (6.0)\r\n", "Requirement already satisfied: partd>=0.3.10 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.2.0)\r\n", "Requirement already satisfied: packaging>=20.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (21.3)\r\n", "Requirement already satisfied: toolz>=0.8.2 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (0.11.2)\r\n", "Requirement already satisfied: cloudpickle>=1.1.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2.0.0)\r\n", "Requirement already satisfied: numpy>=1.18 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.22.3)\r\n", "Requirement already satisfied: distributed==2022.05.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2022.5.0)\r\n", "Requirement already satisfied: tblib>=1.6.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.7.0)\r\n", "Requirement already satisfied: click>=6.6 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (8.1.3)\r\n", "Requirement already satisfied: locket>=1.0.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.0.0)\r\n", "Requirement already satisfied: urllib3 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.26.9)\r\n", "Requirement already satisfied: zict>=0.1.3 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2.2.0)\r\n", "Requirement already satisfied: sortedcontainers!=2.0.0,!=2.0.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2.4.0)\r\n", "Requirement already satisfied: tornado>=6.0.3 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (6.1)\r\n", "Requirement already satisfied: psutil>=5.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (5.9.0)\r\n", "Requirement already satisfied: msgpack>=0.6.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.0.3)\r\n", "Requirement already satisfied: jinja2 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (3.1.1)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Collecting starlette==0.19.1\r\n", " Downloading starlette-0.19.1-py3-none-any.whl (63 kB)\r\n", "\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/63.3 KB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r", "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m63.3/63.3 KB\u001b[0m \u001b[31m18.9 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\r\n", "\u001b[?25hRequirement already satisfied: pydantic!=1.7,!=1.7.1,!=1.7.2,!=1.7.3,!=1.8,!=1.8.1,<2.0.0,>=1.6.2 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from fastapi>=0.61.1->dask-sql) (1.9.1)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: anyio<5,>=3.4.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from starlette==0.19.1->fastapi>=0.61.1->dask-sql) (3.5.0)\r\n", "Requirement already satisfied: typing-extensions>=3.10.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from starlette==0.19.1->fastapi>=0.61.1->dask-sql) (4.2.0)\r\n", "Requirement already satisfied: python-dateutil>=2.8.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from pandas>=1.0.0->dask-sql) (2.8.2)\r\n", "Requirement already satisfied: pytz>=2020.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from pandas>=1.0.0->dask-sql) (2022.1)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Collecting pytz-deprecation-shim\r\n", " Downloading pytz_deprecation_shim-0.1.0.post0-py2.py3-none-any.whl (15 kB)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Collecting h11>=0.8\r\n", " Downloading h11-0.13.0-py3-none-any.whl (58 kB)\r\n", "\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/58.2 KB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r", "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m58.2/58.2 KB\u001b[0m \u001b[31m17.9 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\r\n", "\u001b[?25h" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Collecting asgiref>=3.4.0\r\n", " Downloading asgiref-3.5.2-py3-none-any.whl (22 kB)\r\n", "Requirement already satisfied: wcwidth in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from prompt-toolkit->dask-sql) (0.2.5)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from packaging>=20.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (3.0.8)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: six>=1.5 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from python-dateutil>=2.8.1->pandas>=1.0.0->dask-sql) (1.16.0)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Collecting tzdata\r\n", " Downloading tzdata-2022.1-py2.py3-none-any.whl (339 kB)\r\n", "\u001b[?25l \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m0.0/339.5 KB\u001b[0m \u001b[31m?\u001b[0m eta \u001b[36m-:--:--\u001b[0m\r", "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m339.5/339.5 KB\u001b[0m \u001b[31m55.0 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\r\n", "\u001b[?25h" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: idna>=2.8 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from anyio<5,>=3.4.0->starlette==0.19.1->fastapi>=0.61.1->dask-sql) (3.3)\r\n", "Requirement already satisfied: sniffio>=1.1 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from anyio<5,>=3.4.0->starlette==0.19.1->fastapi>=0.61.1->dask-sql) (1.2.0)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: heapdict in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from zict>=0.1.3->distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (1.0.1)\r\n", "Requirement already satisfied: MarkupSafe>=2.0 in /usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages (from jinja2->distributed==2022.05.0->dask[dataframe,distributed]<=2022.5.2,>=2022.3.0->dask-sql) (2.1.1)\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Installing collected packages: tzdata, jpype1, h11, asgiref, uvicorn, starlette, pytz-deprecation-shim, tzlocal, fastapi, dask-sql\r\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Successfully installed asgiref-3.5.2 dask-sql-2022.6.0 fastapi-0.78.0 h11-0.13.0 jpype1-1.4.0 pytz-deprecation-shim-0.1.0.post0 starlette-0.19.1 tzdata-2022.1 tzlocal-4.2 uvicorn-0.17.6\r\n" ] } ], "source": [ "! pip install dask-sql" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set up a Dask cluster\n", "\n", "Setting up a Dask [Cluster](https://docs.dask.org/en/latest/deploying.html) is optional, but can dramatically expand our options for distributed computation by giving us access to Dask workers on GPUs, remote machines, common cloud providers, and more).\n", "Additionally, connecting our cluster to a Dask [Client](https://distributed.dask.org/en/stable/client.html) will give us access to a dashboard, which can be used to monitor the progress of active computations and diagnose issues.\n", "\n", "For this notebook, we will create a local cluster and connect it to a client.\n", "Once the client has been created, a link will appear to its associated dashboard, which can be viewed throughout the following computations." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:04.462177Z", "iopub.status.busy": "2022-06-17T02:40:04.461287Z", "iopub.status.idle": "2022-06-17T02:40:06.727404Z", "shell.execute_reply": "2022-06-17T02:40:06.726553Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-cb35a4f9-ede6-11ec-9d56-000d3a5c8937

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

fb536d56

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 2\n", "
\n", " Total threads: 4\n", " \n", " Total memory: 1.86 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-2b52b28b-c792-4358-a32e-db8c15e4091d

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:33287\n", " \n", " Workers: 2\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 4\n", "
\n", " Started: Just now\n", " \n", " Total memory: 1.86 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:43063\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:43895/status\n", " \n", " Memory: 0.93 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:36919\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-k7pnu1xl\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:46061\n", " \n", " Total threads: 2\n", "
\n", " Dashboard: http://127.0.0.1:44123/status\n", " \n", " Memory: 0.93 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:43725\n", "
\n", " Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-pvks919x\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "\n", "client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a context\n", "\n", "A `dask_sql.Context` is the Python equivalent to a SQL database, serving as an interface to register all tables and functions used in SQL queries, as well as execute the queries themselves.\n", "In typical usage, a single `Context` is created and used for the duration of a Python script or notebook." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:06.731557Z", "iopub.status.busy": "2022-06-17T02:40:06.731240Z", "iopub.status.idle": "2022-06-17T02:40:08.871860Z", "shell.execute_reply": "2022-06-17T02:40:08.871084Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask_sql/java.py:39: UserWarning: You are running in a conda environment, but the JAVA_PATH is not using it. If this is by mistake, set $JAVA_HOME to /usr/share/miniconda3/envs/dask-examples, instead of /usr/lib/jvm/temurin-11-jdk-amd64.\n", " warnings.warn(\n" ] } ], "source": [ "from dask_sql import Context\n", "c = Context()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load and register data\n", "\n", "Once a `Context` has been created, there are a variety of ways to register tables in it.\n", "The simplest way to do this is through the `create_table` method, which accepts a variety of input types which Dask-SQL then uses to infer the table creation method.\n", "Supported input types include:\n", "\n", "- Dask / [Pandas](https://pandas.pydata.org/)-like dataframes\n", "- String locations of local or remote datasets\n", "- [Apache Hive](https://github.com/apache/hive) tables served through [PyHive](https://github.com/dropbox/PyHive) or [SQLAlchemy](https://www.sqlalchemy.org/)\n", "\n", "Input type can also be specified explicitly by providing a `format`.\n", "When being registered, tables can optionally be persisted into memory by passing `persist=True`, which can greatly speed up repeated queries on the same table at the cost of loading the entire table into memory.\n", "For more information, see [Data Loading and Input](https://dask-sql.readthedocs.io/en/latest/pages/data_input.html)." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:08.875506Z", "iopub.status.busy": "2022-06-17T02:40:08.875236Z", "iopub.status.idle": "2022-06-17T02:40:09.120572Z", "shell.execute_reply": "2022-06-17T02:40:09.119963Z" } }, "outputs": [], "source": [ "import pandas as pd\n", "from dask.datasets import timeseries\n", "\n", "# register and persist a dask table\n", "ddf = timeseries()\n", "c.create_table(\"dask\", ddf, persist=True)\n", "\n", "# register a pandas table (implicitly converted to a dask table)\n", "df = pd.DataFrame({\"a\": [1, 2, 3]})\n", "c.create_table(\"pandas\", df)\n", "\n", "# register a table from local storage; kwargs are passed on to the underlying table creation method\n", "c.create_table(\n", " \"local\",\n", " \"surveys/data/2021-user-survey-results.csv.gz\",\n", " format=\"csv\",\n", " parse_dates=['Timestamp'],\n", " blocksize=None\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Tables can also be registered through SQL `CREATE TABLE WITH` or `CREATE TABLE AS` statements, using the `sql` method." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:09.127341Z", "iopub.status.busy": "2022-06-17T02:40:09.127022Z", "iopub.status.idle": "2022-06-17T02:40:12.003604Z", "shell.execute_reply": "2022-06-17T02:40:12.002924Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/share/miniconda3/envs/dask-examples/lib/python3.9/site-packages/dask/dataframe/io/csv.py:533: UserWarning: Warning gzip compression does not support breaking apart files\n", "Please ensure that each individual file can fit in memory and\n", "use the keyword ``blocksize=None to remove this message``\n", "Setting ``blocksize=None``\n", " warn(\n" ] } ], "source": [ "# replace our table from local storage\n", "c.sql(\"\"\"\n", " CREATE OR REPLACE TABLE\n", " \"local\"\n", " WITH (\n", " location = 'surveys/data/2021-user-survey-results.csv.gz',\n", " format = 'csv',\n", " parse_dates = ARRAY [ 'Timestamp' ]\n", " )\n", "\"\"\")\n", "\n", "# create a new table from a SQL query\n", "c.sql(\"\"\"\n", " CREATE TABLE filtered AS (\n", " SELECT id, name FROM dask WHERE name = 'Zelda'\n", " )\n", "\"\"\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "All registered tables can be listed with a `SHOW TABLES` statement." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:12.042424Z", "iopub.status.busy": "2022-06-17T02:40:12.039528Z", "iopub.status.idle": "2022-06-17T02:40:12.207930Z", "shell.execute_reply": "2022-06-17T02:40:12.206501Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Table
0dask
1pandas
2local
3filtered
\n", "
" ], "text/plain": [ " Table\n", "0 dask\n", "1 pandas\n", "2 local\n", "3 filtered" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c.sql(\"SHOW TABLES FROM root\").compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask-SQL currently offers experimental GPU support, powered by the [RAPIDS](https://rapids.ai/) suite of open source GPU data science libraries.\n", "Input support is currently limited to Dask / Pandas-like dataframes and data in local/remote storage, and though most queries run without issue, users should expect some bugs or undefined behavior.\n", "To register a table and mark it for use on GPUs, `gpu=True` can be passed to a standard `create_table` call, or its equivalent `CREATE TABLE WITH` query (note that this requires [cuDF and Dask-cuDF](https://github.com/rapidsai/cudf)).\n", "\n", "```python\n", "# register a dask table for use on GPUs (not possible in this binder)\n", "c.create_table(\"gpu_dask\", ddf, gpu=True)\n", "\n", "# load in a table from disk using GPU-accelerated IO operations\n", "c.sql(\"\"\"\n", " CREATE TABLE\n", " \"gpu_local\"\n", " WITH (\n", " location = 'surveys/data/2021-user-survey-results.csv.gz',\n", " format = 'csv',\n", " parse_dates = ARRAY [ 'Timestamp' ],\n", " gpu = True\n", " )\n", "\"\"\")\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query the data\n", "\n", "When the `sql` method is called, Dask-SQL hands the query off to Apache Calcite to convert into a relational algebra - essentially a list of SQL tasks that must be executed in order to get a result.\n", "The relational algebra of any query can be viewed directly using the `explain` method." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:12.211607Z", "iopub.status.busy": "2022-06-17T02:40:12.211333Z", "iopub.status.idle": "2022-06-17T02:40:12.344554Z", "shell.execute_reply": "2022-06-17T02:40:12.343822Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DaskProject(EXPR$0=[/(CAST(CASE(=($1, 0), null:DOUBLE, $0)):DECIMAL(19, 15), $1)]): rowcount = 10.0, cumulative cost = {122.5 rows, 111.0 cpu, 0.0 io}, id = 83\n", " DaskAggregate(group=[{}], agg#0=[$SUM0($2)], agg#1=[COUNT($2)]): rowcount = 10.0, cumulative cost = {112.5 rows, 101.0 cpu, 0.0 io}, id = 82\n", " DaskTableScan(table=[[root, dask]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 77\n", "\n" ] } ], "source": [ "print(c.explain(\"SELECT AVG(x) FROM dask\"))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From here, this relational algebra is then converted into a Dask computation graph, which ultimately returns (or in the case of `CREATE TABLE` statements, implicitly assigns) a Dask dataframe." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:12.348191Z", "iopub.status.busy": "2022-06-17T02:40:12.347988Z", "iopub.status.idle": "2022-06-17T02:40:12.456463Z", "shell.execute_reply": "2022-06-17T02:40:12.455711Z" } }, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
AVG(\"dask\".\"x\")
npartitions=1
float64
...
\n", "
\n", "
Dask Name: rename, 107 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " AVG(\"dask\".\"x\")\n", "npartitions=1 \n", " float64\n", " ...\n", "Dask Name: rename, 107 tasks" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c.sql(\"SELECT AVG(x) FROM dask\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask dataframes are lazy, meaning that at the time of their creation, none of their dependent tasks have been executed yet.\n", "To actually execute these tasks and get a result, we must call `compute`." ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:12.460469Z", "iopub.status.busy": "2022-06-17T02:40:12.460270Z", "iopub.status.idle": "2022-06-17T02:40:12.846692Z", "shell.execute_reply": "2022-06-17T02:40:12.846145Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
AVG(\"dask\".\"x\")
00.00018
\n", "
" ], "text/plain": [ " AVG(\"dask\".\"x\")\n", "0 0.00018" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c.sql(\"SELECT AVG(x) FROM dask\").compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Looking at the dashboard, we can see that executing this query has triggered some Dask computations.\n", "\n", "Because the return value of a query is a Dask dataframe, it is also possible to do follow-up operations on it using Dask's dataframe API.\n", "This can be useful if we want to perform some complex operations on a dataframe that are not possible through Dask, then follow up with some simpler operations that can easily be expressed through the dataframe API." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:12.850144Z", "iopub.status.busy": "2022-06-17T02:40:12.849807Z", "iopub.status.idle": "2022-06-17T02:40:19.547792Z", "shell.execute_reply": "2022-06-17T02:40:19.547257Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
xy
name
Alice-46.355731-0.002628
Bob-153.4730730.001757
Charlie-276.880250-0.000068
Dan-351.566539-0.000392
Edith188.9865330.001185
Frank207.996062-0.001484
George190.9867540.004039
Hannah108.173164-0.001052
Ingrid261.7088120.000031
Jerry113.5912180.000775
Kevin49.522584-0.001692
Laura-157.1841390.000718
Michael407.9875070.001060
Norbert-179.051510-0.001432
Oliver-47.3591330.002052
Patricia77.6116340.003366
Quinn-107.462379-0.000059
Ray356.9192660.000792
Sarah-101.2884260.001744
Tim197.774282-0.000359
Ursula-176.0888410.001289
Victor241.2465720.000557
Wendy134.076252-0.000463
Xavier-195.1604090.000138
Yvonne-46.064462-0.003115
Zelda-232.4271030.001937
\n", "
" ], "text/plain": [ " x y\n", "name \n", "Alice -46.355731 -0.002628\n", "Bob -153.473073 0.001757\n", "Charlie -276.880250 -0.000068\n", "Dan -351.566539 -0.000392\n", "Edith 188.986533 0.001185\n", "Frank 207.996062 -0.001484\n", "George 190.986754 0.004039\n", "Hannah 108.173164 -0.001052\n", "Ingrid 261.708812 0.000031\n", "Jerry 113.591218 0.000775\n", "Kevin 49.522584 -0.001692\n", "Laura -157.184139 0.000718\n", "Michael 407.987507 0.001060\n", "Norbert -179.051510 -0.001432\n", "Oliver -47.359133 0.002052\n", "Patricia 77.611634 0.003366\n", "Quinn -107.462379 -0.000059\n", "Ray 356.919266 0.000792\n", "Sarah -101.288426 0.001744\n", "Tim 197.774282 -0.000359\n", "Ursula -176.088841 0.001289\n", "Victor 241.246572 0.000557\n", "Wendy 134.076252 -0.000463\n", "Xavier -195.160409 0.000138\n", "Yvonne -46.064462 -0.003115\n", "Zelda -232.427103 0.001937" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# perform a multi-column sort that isn't possible in Dask\n", "res = c.sql(\"\"\"\n", " SELECT * FROM dask ORDER BY name ASC, id DESC, x ASC\n", "\"\"\")\n", "\n", "# now do some follow groupby aggregations\n", "res.groupby(\"name\").agg({\"x\": \"sum\", \"y\": \"mean\"}).compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Custom functions and aggregations\n", "\n", "When standard SQL functionality is insufficient, it is possible to register custom functions for use in queries.\n", "These functions can be classified as one of the following:\n", "\n", "- Column-wise functions\n", "- Row-wise functions\n", "- Aggregations\n", "\n", "### Column-wise functions\n", "\n", "Column-wise functions can take columns or literal values as input and return a column of an identical length.\n", "Column-wise functions can be registered in a `Context` using the `register_function` method." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:19.552726Z", "iopub.status.busy": "2022-06-17T02:40:19.552137Z", "iopub.status.idle": "2022-06-17T02:40:19.558584Z", "shell.execute_reply": "2022-06-17T02:40:19.557489Z" } }, "outputs": [], "source": [ "import numpy as np\n", "\n", "def f(x):\n", " return x ** 2\n", "\n", "c.register_function(f, \"f\", [(\"x\", np.float64)], np.float64)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Function registration requires the following inputs:\n", "\n", "- A callable function\n", "- A name for the function to be referred to in queries\n", "- A list of tuples, representing the input variables and their respective types, which can be either Pandas or [NumPy](https://numpy.org/) types\n", "- A type for the output column\n", "\n", "Once a function has been registered, it can be called like any other standard SQL function." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:19.561312Z", "iopub.status.busy": "2022-06-17T02:40:19.560923Z", "iopub.status.idle": "2022-06-17T02:40:20.180877Z", "shell.execute_reply": "2022-06-17T02:40:20.179707Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\"F\"(\"dask\".\"x\")
timestamp
2000-01-01 00:00:000.640374
2000-01-01 00:00:010.271517
2000-01-01 00:00:020.000299
2000-01-01 00:00:030.010415
2000-01-01 00:00:040.469472
......
2000-01-30 23:59:550.905310
2000-01-30 23:59:560.982045
2000-01-30 23:59:570.404257
2000-01-30 23:59:580.563177
2000-01-30 23:59:590.002484
\n", "

2592000 rows × 1 columns

\n", "
" ], "text/plain": [ " \"F\"(\"dask\".\"x\")\n", "timestamp \n", "2000-01-01 00:00:00 0.640374\n", "2000-01-01 00:00:01 0.271517\n", "2000-01-01 00:00:02 0.000299\n", "2000-01-01 00:00:03 0.010415\n", "2000-01-01 00:00:04 0.469472\n", "... ...\n", "2000-01-30 23:59:55 0.905310\n", "2000-01-30 23:59:56 0.982045\n", "2000-01-30 23:59:57 0.404257\n", "2000-01-30 23:59:58 0.563177\n", "2000-01-30 23:59:59 0.002484\n", "\n", "[2592000 rows x 1 columns]" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c.sql(\"SELECT F(x) FROM dask\").compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Row-wise functions\n", "\n", "In some cases, it may be easier to write a custom function that processes a dict-like `row` object - otherwise known as a row-wise function.\n", "These functions can also be registered using `register_function` by passing `row_udf=True`, and used in the same manner as a column-wise function." ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:20.184328Z", "iopub.status.busy": "2022-06-17T02:40:20.183995Z", "iopub.status.idle": "2022-06-17T02:40:50.819352Z", "shell.execute_reply": "2022-06-17T02:40:50.818492Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\"G\"(\"dask\".\"x\", \"dask\".\"y\")
timestamp
2000-01-01 00:00:000.455330
2000-01-01 00:00:010.958461
2000-01-01 00:00:020.417149
2000-01-01 00:00:030.149473
2000-01-01 00:00:041.085795
......
2000-01-30 23:59:551.942582
2000-01-30 23:59:560.608836
2000-01-30 23:59:570.148307
2000-01-30 23:59:581.102380
2000-01-30 23:59:590.286114
\n", "

2592000 rows × 1 columns

\n", "
" ], "text/plain": [ " \"G\"(\"dask\".\"x\", \"dask\".\"y\")\n", "timestamp \n", "2000-01-01 00:00:00 0.455330\n", "2000-01-01 00:00:01 0.958461\n", "2000-01-01 00:00:02 0.417149\n", "2000-01-01 00:00:03 0.149473\n", "2000-01-01 00:00:04 1.085795\n", "... ...\n", "2000-01-30 23:59:55 1.942582\n", "2000-01-30 23:59:56 0.608836\n", "2000-01-30 23:59:57 0.148307\n", "2000-01-30 23:59:58 1.102380\n", "2000-01-30 23:59:59 0.286114\n", "\n", "[2592000 rows x 1 columns]" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def g(row):\n", " if row[\"x\"] > row[\"y\"]:\n", " return row[\"x\"] - row[\"y\"]\n", " return row[\"y\"] - row[\"x\"]\n", "\n", "c.register_function(g, \"g\", [(\"x\", np.float64), (\"y\", np.float64)], np.float64, row_udf=True)\n", "\n", "c.sql(\"SELECT G(x, y) FROM dask\").compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that unlike column-wise functions, which are called directly using specified columns and literals as input, row-wise functions are called using `apply`, which can have unpredictable performance depending on the underlying dataframe library (e.g. Pandas, cuDF) and the function itself.\n", "\n", "### Aggregations\n", "\n", "Aggregations take a single column as input and return a single value - thus, they can only be used to reduce the results of a `GROUP BY` query.\n", "Aggregations can be registered using the `register_aggregation` method, which is functionally similar to `register_function` but takes a Dask [Aggregation](https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate) as input instead of a callable function." ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:50.822415Z", "iopub.status.busy": "2022-06-17T02:40:50.821996Z", "iopub.status.idle": "2022-06-17T02:40:51.019519Z", "shell.execute_reply": "2022-06-17T02:40:51.018783Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\"MY_SUM\"(\"dask\".\"x\")
0466.218645
\n", "
" ], "text/plain": [ " \"MY_SUM\"(\"dask\".\"x\")\n", "0 466.218645" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask.dataframe as dd\n", "\n", "my_sum = dd.Aggregation(\"my_sum\", lambda x: x.sum(), lambda x: x.sum())\n", "\n", "c.register_aggregation(my_sum, \"my_sum\", [(\"x\", np.float64)], np.float64)\n", "\n", "c.sql(\"SELECT MY_SUM(x) FROM dask\").compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Machine learning in SQL\n", "\n", "Dask-SQL has support for both model training and prediction, enabling machine learning workflows with a flexible combination of both Python and SQL.\n", "A model can be registered in a `Context` either through the `register_model` method or a `CREATE MODEL` statement." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:51.022485Z", "iopub.status.busy": "2022-06-17T02:40:51.022160Z", "iopub.status.idle": "2022-06-17T02:40:52.715458Z", "shell.execute_reply": "2022-06-17T02:40:52.713626Z" } }, "outputs": [], "source": [ "from dask_ml.linear_model import LinearRegression\n", "from sklearn.ensemble import GradientBoostingClassifier\n", "\n", "# create a dask-ml model and train it\n", "model = GradientBoostingClassifier()\n", "data = c.sql(\"SELECT x, y, x * y > 0 AS target FROM dask LIMIT 50\")\n", "model.fit(data[[\"x\", \"y\"]], data[\"target\"])\n", "\n", "# register this model in the context\n", "c.register_model(\"python_model\", model, training_columns=[\"x\", \"y\"])\n", "\n", "# create and train a model directly from SQL\n", "c.sql(\"\"\"\n", " CREATE MODEL sql_model WITH (\n", " model_class = 'sklearn.ensemble.GradientBoostingClassifier',\n", " wrap_predict = True,\n", " target_column = 'target'\n", " ) AS (\n", " SELECT x, y, x * y > 0 AS target\n", " FROM dask\n", " LIMIT 50\n", " )\n", "\"\"\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Registered models must follow the [scikit-learn](https://scikit-learn.org/stable/index.html) interface by implementing a `predict` method.\n", "As with tables, all registered models can be listed with a `SHOW MODEL` statement." ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:52.719623Z", "iopub.status.busy": "2022-06-17T02:40:52.719311Z", "iopub.status.idle": "2022-06-17T02:40:52.767973Z", "shell.execute_reply": "2022-06-17T02:40:52.767404Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Models
0python_model
1sql_model
\n", "
" ], "text/plain": [ " Models\n", "0 python_model\n", "1 sql_model" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c.sql(\"SHOW MODELS\").compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From here, the models can be used to make predictions using the `PREDICT` keyword as part of a `SELECT` query." ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "execution": { "iopub.execute_input": "2022-06-17T02:40:52.771581Z", "iopub.status.busy": "2022-06-17T02:40:52.771202Z", "iopub.status.idle": "2022-06-17T02:40:57.366341Z", "shell.execute_reply": "2022-06-17T02:40:57.365657Z" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
xyactualtarget
timestamp
2000-01-01 00:00:50-0.381223-0.717676TrueTrue
2000-01-01 00:00:51-0.609896-0.186250TrueTrue
2000-01-01 00:00:520.368642-0.927883FalseFalse
2000-01-01 00:00:530.514496-0.815545FalseFalse
2000-01-01 00:00:540.228430-0.519825FalseFalse
...............
2000-01-30 23:59:55-0.9514780.991105FalseFalse
2000-01-30 23:59:560.9909820.382146TrueTrue
2000-01-30 23:59:57-0.635812-0.487506TrueTrue
2000-01-30 23:59:580.750451-0.351929FalseFalse
2000-01-30 23:59:59-0.049839-0.335953TrueFalse
\n", "

2591950 rows × 4 columns

\n", "
" ], "text/plain": [ " x y actual target\n", "timestamp \n", "2000-01-01 00:00:50 -0.381223 -0.717676 True True\n", "2000-01-01 00:00:51 -0.609896 -0.186250 True True\n", "2000-01-01 00:00:52 0.368642 -0.927883 False False\n", "2000-01-01 00:00:53 0.514496 -0.815545 False False\n", "2000-01-01 00:00:54 0.228430 -0.519825 False False\n", "... ... ... ... ...\n", "2000-01-30 23:59:55 -0.951478 0.991105 False False\n", "2000-01-30 23:59:56 0.990982 0.382146 True True\n", "2000-01-30 23:59:57 -0.635812 -0.487506 True True\n", "2000-01-30 23:59:58 0.750451 -0.351929 False False\n", "2000-01-30 23:59:59 -0.049839 -0.335953 True False\n", "\n", "[2591950 rows x 4 columns]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c.sql(\"\"\"\n", " SELECT * FROM PREDICT (\n", " MODEL sql_model,\n", " SELECT x, y, x * y > 0 AS actual FROM dask\n", " OFFSET 50\n", " )\n", "\"\"\").compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }