{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask Bags\n",
"\n",
"\n",
"Dask Bag implements operations like `map`, `filter`, `groupby` and aggregations on collections of Python objects. It does this in parallel and in small memory using Python iterators. It is similar to a parallel version of itertools or a Pythonic version of the PySpark RDD.\n",
"\n",
"Dask Bags are often used to do simple preprocessing on log files, JSON records, or other user defined Python objects.\n",
"\n",
"Full API documentation is available here: http://docs.dask.org/en/latest/bag-api.html"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start Dask Client for Dashboard\n",
"\n",
"Starting the Dask Client is optional. It will provide a dashboard which \n",
"is useful to gain insight on the computation. \n",
"\n",
"The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:28.391423Z",
"iopub.status.busy": "2021-01-14T10:42:28.390949Z",
"iopub.status.idle": "2021-01-14T10:42:31.245840Z",
"shell.execute_reply": "2021-01-14T10:42:31.246191Z"
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/usr/share/miniconda3/envs/dask-examples/lib/python3.8/site-packages/distributed/node.py:151: UserWarning: Port 8787 is already in use.\n",
"Perhaps you already have a cluster running?\n",
"Hosting the HTTP server on port 36329 instead\n",
" warnings.warn(\n"
]
},
{
"data": {
"text/html": [
"
\n",
"\n",
"\n",
"Client\n",
"\n",
" | \n",
"\n",
"Cluster\n",
"\n",
" - Workers: 4
\n",
" - Cores: 4
\n",
" - Memory: 7.29 GB
\n",
" \n",
" | \n",
"
\n",
"
"
],
"text/plain": [
""
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client, progress\n",
"client = Client(n_workers=4, threads_per_worker=1)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Random Data\n",
"\n",
"We create a random set of record data and store it to disk as many JSON files. This will serve as our data for this notebook."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:31.248413Z",
"iopub.status.busy": "2021-01-14T10:42:31.247995Z",
"iopub.status.idle": "2021-01-14T10:42:32.757429Z",
"shell.execute_reply": "2021-01-14T10:42:32.757028Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"['/home/runner/work/dask-examples/dask-examples/data/0.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/1.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/2.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/3.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/4.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/5.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/6.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/7.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/8.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/9.json']"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask\n",
"import json\n",
"import os\n",
"\n",
"os.makedirs('data', exist_ok=True) # Create data/ directory\n",
"\n",
"b = dask.datasets.make_people() # Make records of people\n",
"b.map(json.dumps).to_textfiles('data/*.json') # Encode as JSON, write to disk"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Read JSON data\n",
"\n",
"Now that we have some JSON data in a file lets take a look at it with Dask Bag and Python JSON module."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:32.768110Z",
"iopub.status.busy": "2021-01-14T10:42:32.767708Z",
"iopub.status.idle": "2021-01-14T10:42:32.924165Z",
"shell.execute_reply": "2021-01-14T10:42:32.924512Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{\"age\": 56, \"name\": [\"Le\", \"Morton\"], \"occupation\": \"Medical Secretary\", \"telephone\": \"591.139.2820\", \"address\": {\"address\": \"1296 Young Bend\", \"city\": \"Great Falls\"}, \"credit-card\": {\"number\": \"3444 778222 54046\", \"expiration-date\": \"08/20\"}}\r\n",
"{\"age\": 17, \"name\": [\"Mary\", \"Pugh\"], \"occupation\": \"Industrial Consultant\", \"telephone\": \"066.481.6992\", \"address\": {\"address\": \"1298 Jamestown Square\", \"city\": \"Mount Clemens\"}, \"credit-card\": {\"number\": \"3400 563471 96728\", \"expiration-date\": \"02/20\"}}\r\n"
]
}
],
"source": [
"!head -n 2 data/0.json"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:32.927315Z",
"iopub.status.busy": "2021-01-14T10:42:32.926419Z",
"iopub.status.idle": "2021-01-14T10:42:32.935576Z",
"shell.execute_reply": "2021-01-14T10:42:32.936280Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"dask.bag"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import dask.bag as db\n",
"import json\n",
"\n",
"b = db.read_text('data/*.json').map(json.loads)\n",
"b"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:32.938848Z",
"iopub.status.busy": "2021-01-14T10:42:32.938032Z",
"iopub.status.idle": "2021-01-14T10:42:32.958187Z",
"shell.execute_reply": "2021-01-14T10:42:32.959280Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"({'age': 56,\n",
" 'name': ['Le', 'Morton'],\n",
" 'occupation': 'Medical Secretary',\n",
" 'telephone': '591.139.2820',\n",
" 'address': {'address': '1296 Young Bend', 'city': 'Great Falls'},\n",
" 'credit-card': {'number': '3444 778222 54046', 'expiration-date': '08/20'}},\n",
" {'age': 17,\n",
" 'name': ['Mary', 'Pugh'],\n",
" 'occupation': 'Industrial Consultant',\n",
" 'telephone': '066.481.6992',\n",
" 'address': {'address': '1298 Jamestown Square', 'city': 'Mount Clemens'},\n",
" 'credit-card': {'number': '3400 563471 96728', 'expiration-date': '02/20'}})"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"b.take(2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Map, Filter, Aggregate\n",
"\n",
"We can process this data by filtering out only certain records of interest, mapping functions over it to process our data, and aggregating those results to a total value."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:32.963743Z",
"iopub.status.busy": "2021-01-14T10:42:32.961162Z",
"iopub.status.idle": "2021-01-14T10:42:32.979873Z",
"shell.execute_reply": "2021-01-14T10:42:32.979457Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"({'age': 56,\n",
" 'name': ['Le', 'Morton'],\n",
" 'occupation': 'Medical Secretary',\n",
" 'telephone': '591.139.2820',\n",
" 'address': {'address': '1296 Young Bend', 'city': 'Great Falls'},\n",
" 'credit-card': {'number': '3444 778222 54046', 'expiration-date': '08/20'}},\n",
" {'age': 65,\n",
" 'name': ['Glayds', 'Harvey'],\n",
" 'occupation': 'Sales Assistant',\n",
" 'telephone': '(468) 305-6734',\n",
" 'address': {'address': '1070 Doric Drung', 'city': 'Rockwall'},\n",
" 'credit-card': {'number': '4790 9026 6695 6340',\n",
" 'expiration-date': '06/16'}})"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"b.filter(lambda record: record['age'] > 30).take(2) # Select only people over 30"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:32.983611Z",
"iopub.status.busy": "2021-01-14T10:42:32.983143Z",
"iopub.status.idle": "2021-01-14T10:42:33.002458Z",
"shell.execute_reply": "2021-01-14T10:42:33.002779Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"('Medical Secretary', 'Industrial Consultant')"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"b.map(lambda record: record['occupation']).take(2) # Select the occupation field"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:33.005257Z",
"iopub.status.busy": "2021-01-14T10:42:33.004427Z",
"iopub.status.idle": "2021-01-14T10:42:33.121858Z",
"shell.execute_reply": "2021-01-14T10:42:33.121510Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"10000"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"b.count().compute() # Count total number of records"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Chain computations\n",
"\n",
"It is common to do many of these steps in one pipeline, only calling `compute` or `take` at the end."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:33.126689Z",
"iopub.status.busy": "2021-01-14T10:42:33.126295Z",
"iopub.status.idle": "2021-01-14T10:42:33.130328Z",
"shell.execute_reply": "2021-01-14T10:42:33.130892Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"dask.bag"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"result = (b.filter(lambda record: record['age'] > 30)\n",
" .map(lambda record: record['occupation'])\n",
" .frequencies(sort=True)\n",
" .topk(10, key=1))\n",
"result"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As with all lazy Dask collections, we need to call `compute` to actually evaluate our result. The `take` method used in earlier examples is also like `compute` and will also trigger computation."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:33.133455Z",
"iopub.status.busy": "2021-01-14T10:42:33.132537Z",
"iopub.status.idle": "2021-01-14T10:42:33.262949Z",
"shell.execute_reply": "2021-01-14T10:42:33.262589Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"[('Vehicle Assessor', 16),\n",
" ('Housewife', 14),\n",
" ('Cable TV Installer', 13),\n",
" ('Town Clerk', 13),\n",
" ('Investigator', 13),\n",
" ('Fruiterer', 13),\n",
" ('Lathe Operator', 12),\n",
" ('Bank Clerk', 12),\n",
" ('Aromatherapist', 12),\n",
" ('Masseuse', 12)]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"result.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Transform and Store\n",
"\n",
"Sometimes we want to compute aggregations as above, but sometimes we want to store results to disk for future analyses. For that we can use methods like `to_textfiles` and `json.dumps`, or we can convert to Dask Dataframes and use their storage systems, which we'll see more of in the next section."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:33.265201Z",
"iopub.status.busy": "2021-01-14T10:42:33.264803Z",
"iopub.status.idle": "2021-01-14T10:42:33.411818Z",
"shell.execute_reply": "2021-01-14T10:42:33.411421Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"['/home/runner/work/dask-examples/dask-examples/data/processed.0.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.1.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.2.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.3.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.4.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.5.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.6.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.7.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.8.json',\n",
" '/home/runner/work/dask-examples/dask-examples/data/processed.9.json']"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"(b.filter(lambda record: record['age'] > 30) # Select records of interest\n",
" .map(json.dumps) # Convert Python objects to text\n",
" .to_textfiles('data/processed.*.json')) # Write to local disk"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Convert to Dask Dataframes\n",
"\n",
"Dask Bags are good for reading in initial data, doing a bit of pre-processing, and then handing off to some other more efficient form like Dask Dataframes. Dask Dataframes use Pandas internally, and so can be much faster on numeric data and also have more complex algorithms. \n",
"\n",
"However, Dask Dataframes also expect data that is organized as flat columns. It does not support nested JSON data very well (Bag is better for this).\n",
"\n",
"Here we make a function to flatten down our nested data structure, map that across our records, and then convert that to a Dask Dataframe."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:33.415401Z",
"iopub.status.busy": "2021-01-14T10:42:33.414990Z",
"iopub.status.idle": "2021-01-14T10:42:33.435883Z",
"shell.execute_reply": "2021-01-14T10:42:33.435498Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"({'age': 56,\n",
" 'name': ['Le', 'Morton'],\n",
" 'occupation': 'Medical Secretary',\n",
" 'telephone': '591.139.2820',\n",
" 'address': {'address': '1296 Young Bend', 'city': 'Great Falls'},\n",
" 'credit-card': {'number': '3444 778222 54046', 'expiration-date': '08/20'}},)"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"b.take(1)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:33.440887Z",
"iopub.status.busy": "2021-01-14T10:42:33.440486Z",
"iopub.status.idle": "2021-01-14T10:42:33.461103Z",
"shell.execute_reply": "2021-01-14T10:42:33.460758Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"({'age': 56,\n",
" 'occupation': 'Medical Secretary',\n",
" 'telephone': '591.139.2820',\n",
" 'credit-card-number': '3444 778222 54046',\n",
" 'credit-card-expiration': '08/20',\n",
" 'name': 'Le Morton',\n",
" 'street-address': '1296 Young Bend',\n",
" 'city': 'Great Falls'},)"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def flatten(record):\n",
" return {\n",
" 'age': record['age'],\n",
" 'occupation': record['occupation'],\n",
" 'telephone': record['telephone'],\n",
" 'credit-card-number': record['credit-card']['number'],\n",
" 'credit-card-expiration': record['credit-card']['expiration-date'],\n",
" 'name': ' '.join(record['name']),\n",
" 'street-address': record['address']['address'],\n",
" 'city': record['address']['city'] \n",
" }\n",
"\n",
"b.map(flatten).take(1)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:33.463628Z",
"iopub.status.busy": "2021-01-14T10:42:33.462693Z",
"iopub.status.idle": "2021-01-14T10:42:33.943028Z",
"shell.execute_reply": "2021-01-14T10:42:33.943401Z"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" age | \n",
" occupation | \n",
" telephone | \n",
" credit-card-number | \n",
" credit-card-expiration | \n",
" name | \n",
" street-address | \n",
" city | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 56 | \n",
" Medical Secretary | \n",
" 591.139.2820 | \n",
" 3444 778222 54046 | \n",
" 08/20 | \n",
" Le Morton | \n",
" 1296 Young Bend | \n",
" Great Falls | \n",
"
\n",
" \n",
" 1 | \n",
" 17 | \n",
" Industrial Consultant | \n",
" 066.481.6992 | \n",
" 3400 563471 96728 | \n",
" 02/20 | \n",
" Mary Pugh | \n",
" 1298 Jamestown Square | \n",
" Mount Clemens | \n",
"
\n",
" \n",
" 2 | \n",
" 65 | \n",
" Sales Assistant | \n",
" (468) 305-6734 | \n",
" 4790 9026 6695 6340 | \n",
" 06/16 | \n",
" Glayds Harvey | \n",
" 1070 Doric Drung | \n",
" Rockwall | \n",
"
\n",
" \n",
" 3 | \n",
" 33 | \n",
" Charge Hand | \n",
" +1-(723)-613-3031 | \n",
" 3797 255724 62716 | \n",
" 08/24 | \n",
" Darius Hunt | \n",
" 289 Mabini Nene | \n",
" Bellingham | \n",
"
\n",
" \n",
" 4 | \n",
" 66 | \n",
" Projectionist | \n",
" +1-(588)-266-1704 | \n",
" 3436 492655 07632 | \n",
" 03/20 | \n",
" Marvin Weber | \n",
" 542 Stanyan Loop | \n",
" Ocala | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" age occupation telephone credit-card-number \\\n",
"0 56 Medical Secretary 591.139.2820 3444 778222 54046 \n",
"1 17 Industrial Consultant 066.481.6992 3400 563471 96728 \n",
"2 65 Sales Assistant (468) 305-6734 4790 9026 6695 6340 \n",
"3 33 Charge Hand +1-(723)-613-3031 3797 255724 62716 \n",
"4 66 Projectionist +1-(588)-266-1704 3436 492655 07632 \n",
"\n",
" credit-card-expiration name street-address city \n",
"0 08/20 Le Morton 1296 Young Bend Great Falls \n",
"1 02/20 Mary Pugh 1298 Jamestown Square Mount Clemens \n",
"2 06/16 Glayds Harvey 1070 Doric Drung Rockwall \n",
"3 08/24 Darius Hunt 289 Mabini Nene Bellingham \n",
"4 03/20 Marvin Weber 542 Stanyan Loop Ocala "
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = b.map(flatten).to_dataframe()\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can now perform the same computation as before, but now using Pandas and Dask dataframe."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"execution": {
"iopub.execute_input": "2021-01-14T10:42:33.952925Z",
"iopub.status.busy": "2021-01-14T10:42:33.952527Z",
"iopub.status.idle": "2021-01-14T10:42:34.610969Z",
"shell.execute_reply": "2021-01-14T10:42:34.611307Z"
}
},
"outputs": [
{
"data": {
"text/plain": [
"Vehicle Assessor 16\n",
"Housewife 14\n",
"Investigator 13\n",
"Town Clerk 13\n",
"Cable TV Installer 13\n",
"Fruiterer 13\n",
"Psychoanalyst 12\n",
"Home Help 12\n",
"Aromatherapist 12\n",
"Lathe Operator 12\n",
"Name: occupation, dtype: int64"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df[df.age > 30].occupation.value_counts().nlargest(10).compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Learn More\n",
"\n",
"You may be interested in the following links:\n",
"\n",
"- [Dask Bag Documentation](https://docs.dask.org/en/latest/bag.html)\n",
"- [API Documentation](http://docs.dask.org/en/latest/bag-api.html)\n",
"- [dask tutorial](https://github.com/dask/dask-tutorial), notebook 02, for a more in-depth introduction."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}