Dask Bags
Contents
Live Notebook
You can run this notebook in a live session or view it on Github.
Dask Bags¶
Dask Bag implements operations like map
, filter
, groupby
and aggregations on collections of Python objects. It does this in parallel and in small memory using Python iterators. It is similar to a parallel version of itertools or a Pythonic version of the PySpark RDD.
Dask Bags are often used to do simple preprocessing on log files, JSON records, or other user defined Python objects.
Full API documentation is available here: http://docs.dask.org/en/latest/bag-api.html
Start Dask Client for Dashboard¶
Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.
The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.
[1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client
[1]:
Client
Client-db1fb37f-0ddf-11ed-9823-000d3a8f7959
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
fce50585
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 4 | Total memory: 6.78 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-6db95d18-fecb-4e65-8bb9-c1ecfb644d25
Comm: tcp://127.0.0.1:37071 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 4 |
Started: Just now | Total memory: 6.78 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:38523 | Total threads: 1 |
Dashboard: http://127.0.0.1:33803/status | Memory: 1.70 GiB |
Nanny: tcp://127.0.0.1:46261 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-z2ivdlv5 |
Worker: 1
Comm: tcp://127.0.0.1:38005 | Total threads: 1 |
Dashboard: http://127.0.0.1:43833/status | Memory: 1.70 GiB |
Nanny: tcp://127.0.0.1:36091 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-dknui8s8 |
Worker: 2
Comm: tcp://127.0.0.1:36089 | Total threads: 1 |
Dashboard: http://127.0.0.1:46853/status | Memory: 1.70 GiB |
Nanny: tcp://127.0.0.1:45553 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-ei436jch |
Worker: 3
Comm: tcp://127.0.0.1:34709 | Total threads: 1 |
Dashboard: http://127.0.0.1:35105/status | Memory: 1.70 GiB |
Nanny: tcp://127.0.0.1:40747 | |
Local directory: /home/runner/work/dask-examples/dask-examples/dask-worker-space/worker-uvqwrfkf |
Create Random Data¶
We create a random set of record data and store it to disk as many JSON files. This will serve as our data for this notebook.
[2]:
import dask
import json
import os
os.makedirs('data', exist_ok=True) # Create data/ directory
b = dask.datasets.make_people() # Make records of people
b.map(json.dumps).to_textfiles('data/*.json') # Encode as JSON, write to disk
[2]:
['/home/runner/work/dask-examples/dask-examples/data/0.json',
'/home/runner/work/dask-examples/dask-examples/data/1.json',
'/home/runner/work/dask-examples/dask-examples/data/2.json',
'/home/runner/work/dask-examples/dask-examples/data/3.json',
'/home/runner/work/dask-examples/dask-examples/data/4.json',
'/home/runner/work/dask-examples/dask-examples/data/5.json',
'/home/runner/work/dask-examples/dask-examples/data/6.json',
'/home/runner/work/dask-examples/dask-examples/data/7.json',
'/home/runner/work/dask-examples/dask-examples/data/8.json',
'/home/runner/work/dask-examples/dask-examples/data/9.json']
Read JSON data¶
Now that we have some JSON data in a file lets take a look at it with Dask Bag and Python JSON module.
[3]:
!head -n 2 data/0.json
{"age": 61, "name": ["Emiko", "Oliver"], "occupation": "Medical Student", "telephone": "166.814.5565", "address": {"address": "645 Drumm Line", "city": "Kennewick"}, "credit-card": {"number": "3792 459318 98518", "expiration-date": "12/23"}}
{"age": 54, "name": ["Wendolyn", "Ortega"], "occupation": "Tractor Driver", "telephone": "1-975-090-1672", "address": {"address": "1274 Harbor Court", "city": "Mustang"}, "credit-card": {"number": "4600 5899 6829 6887", "expiration-date": "11/25"}}
[4]:
import dask.bag as db
import json
b = db.read_text('data/*.json').map(json.loads)
b
[4]:
dask.bag<loads, npartitions=10>
[5]:
b.take(2)
[5]:
({'age': 61,
'name': ['Emiko', 'Oliver'],
'occupation': 'Medical Student',
'telephone': '166.814.5565',
'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},
{'age': 54,
'name': ['Wendolyn', 'Ortega'],
'occupation': 'Tractor Driver',
'telephone': '1-975-090-1672',
'address': {'address': '1274 Harbor Court', 'city': 'Mustang'},
'credit-card': {'number': '4600 5899 6829 6887',
'expiration-date': '11/25'}})
Map, Filter, Aggregate¶
We can process this data by filtering out only certain records of interest, mapping functions over it to process our data, and aggregating those results to a total value.
[6]:
b.filter(lambda record: record['age'] > 30).take(2) # Select only people over 30
[6]:
({'age': 61,
'name': ['Emiko', 'Oliver'],
'occupation': 'Medical Student',
'telephone': '166.814.5565',
'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},
{'age': 54,
'name': ['Wendolyn', 'Ortega'],
'occupation': 'Tractor Driver',
'telephone': '1-975-090-1672',
'address': {'address': '1274 Harbor Court', 'city': 'Mustang'},
'credit-card': {'number': '4600 5899 6829 6887',
'expiration-date': '11/25'}})
[7]:
b.map(lambda record: record['occupation']).take(2) # Select the occupation field
[7]:
('Medical Student', 'Tractor Driver')
[8]:
b.count().compute() # Count total number of records
[8]:
10000
Chain computations¶
It is common to do many of these steps in one pipeline, only calling compute
or take
at the end.
[9]:
result = (b.filter(lambda record: record['age'] > 30)
.map(lambda record: record['occupation'])
.frequencies(sort=True)
.topk(10, key=1))
result
[9]:
dask.bag<topk-aggregate, npartitions=1>
As with all lazy Dask collections, we need to call compute
to actually evaluate our result. The take
method used in earlier examples is also like compute
and will also trigger computation.
[10]:
result.compute()
[10]:
[('Merchant', 16),
('Coroner', 14),
('Book Binder', 13),
('Medical Practitioner', 13),
('Payroll Supervisor', 13),
('Telecommunications', 13),
('Thermal Insulator', 13),
('Pattern Maker', 12),
('Advertising Executive', 12),
('Insurance Staff', 12)]
Transform and Store¶
Sometimes we want to compute aggregations as above, but sometimes we want to store results to disk for future analyses. For that we can use methods like to_textfiles
and json.dumps
, or we can convert to Dask Dataframes and use their storage systems, which we’ll see more of in the next section.
[11]:
(b.filter(lambda record: record['age'] > 30) # Select records of interest
.map(json.dumps) # Convert Python objects to text
.to_textfiles('data/processed.*.json')) # Write to local disk
[11]:
['/home/runner/work/dask-examples/dask-examples/data/processed.0.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.1.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.2.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.3.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.4.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.5.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.6.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.7.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.8.json',
'/home/runner/work/dask-examples/dask-examples/data/processed.9.json']
Convert to Dask Dataframes¶
Dask Bags are good for reading in initial data, doing a bit of pre-processing, and then handing off to some other more efficient form like Dask Dataframes. Dask Dataframes use Pandas internally, and so can be much faster on numeric data and also have more complex algorithms.
However, Dask Dataframes also expect data that is organized as flat columns. It does not support nested JSON data very well (Bag is better for this).
Here we make a function to flatten down our nested data structure, map that across our records, and then convert that to a Dask Dataframe.
[12]:
b.take(1)
[12]:
({'age': 61,
'name': ['Emiko', 'Oliver'],
'occupation': 'Medical Student',
'telephone': '166.814.5565',
'address': {'address': '645 Drumm Line', 'city': 'Kennewick'},
'credit-card': {'number': '3792 459318 98518', 'expiration-date': '12/23'}},)
[13]:
def flatten(record):
return {
'age': record['age'],
'occupation': record['occupation'],
'telephone': record['telephone'],
'credit-card-number': record['credit-card']['number'],
'credit-card-expiration': record['credit-card']['expiration-date'],
'name': ' '.join(record['name']),
'street-address': record['address']['address'],
'city': record['address']['city']
}
b.map(flatten).take(1)
[13]:
({'age': 61,
'occupation': 'Medical Student',
'telephone': '166.814.5565',
'credit-card-number': '3792 459318 98518',
'credit-card-expiration': '12/23',
'name': 'Emiko Oliver',
'street-address': '645 Drumm Line',
'city': 'Kennewick'},)
[14]:
df = b.map(flatten).to_dataframe()
df.head()
[14]:
age | occupation | telephone | credit-card-number | credit-card-expiration | name | street-address | city | |
---|---|---|---|---|---|---|---|---|
0 | 61 | Medical Student | 166.814.5565 | 3792 459318 98518 | 12/23 | Emiko Oliver | 645 Drumm Line | Kennewick |
1 | 54 | Tractor Driver | 1-975-090-1672 | 4600 5899 6829 6887 | 11/25 | Wendolyn Ortega | 1274 Harbor Court | Mustang |
2 | 33 | Doctor | 107-044-4885 | 3464 081512 23342 | 03/20 | Alvin Rich | 1242 Vidal Plantation | Wyandotte |
3 | 34 | Counsellor | 219-748-6795 | 4018 1801 8111 7757 | 08/23 | Toccara Rogers | 252 Sampson Drive | Parma Heights |
4 | 34 | Graphic Designer | 1-509-313-7125 | 4886 7380 4681 0434 | 05/18 | Randal Roberts | 767 Telegraph Side road | New York |
We can now perform the same computation as before, but now using Pandas and Dask dataframe.
[15]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()
[15]:
Merchant 16
Coroner 14
Thermal Insulator 13
Book Binder 13
Payroll Supervisor 13
Medical Practitioner 13
Telecommunications 13
Optometrist 12
Advertising Assistant 12
Care Manager 12
Name: occupation, dtype: int64
Learn More¶
You may be interested in the following links:
dask tutorial, notebook 02, for a more in-depth introduction.