Live Notebook

You can run this notebook in a live session Binder or view it on Github.

Dask Bags

Dask Bag implements operations like map, filter, groupby and aggregations on collections of Python objects. It does this in parallel and in small memory using Python iterators. It is similar to a parallel version of itertools or a Pythonic version of the PySpark RDD.

Dask Bags are often used to do simple preprocessing on log files, JSON records, or other user defined Python objects.

Full API documentation is available here: http://docs.dask.org/en/latest/bag-api.html

Start Dask Client for Dashboard

Starting the Dask Client is optional. It will provide a dashboard which is useful to gain insight on the computation.

The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning.

[1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client
[1]:

Client

Cluster

  • Workers: 4
  • Cores: 4
  • Memory: 8.36 GB

Create Random Data

We create a random set of record data and store it to disk as many JSON files. This will serve as our data for this notebook.

[2]:
import dask
import json
import os

os.makedirs('data', exist_ok=True)              # Create data/ directory

b = dask.datasets.make_people()                 # Make records of people
b.map(json.dumps).to_textfiles('data/*.json')   # Encode as JSON, write to disk
/home/travis/miniconda/envs/test/lib/python3.7/site-packages/fsspec/implementations/local.py:33: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.
  FutureWarning,
[2]:
['/home/travis/build/dask/dask-examples/data/0.json',
 '/home/travis/build/dask/dask-examples/data/1.json',
 '/home/travis/build/dask/dask-examples/data/2.json',
 '/home/travis/build/dask/dask-examples/data/3.json',
 '/home/travis/build/dask/dask-examples/data/4.json',
 '/home/travis/build/dask/dask-examples/data/5.json',
 '/home/travis/build/dask/dask-examples/data/6.json',
 '/home/travis/build/dask/dask-examples/data/7.json',
 '/home/travis/build/dask/dask-examples/data/8.json',
 '/home/travis/build/dask/dask-examples/data/9.json']

Read JSON data

Now that we have some JSON data in a file lets take a look at it with Dask Bag and Python JSON module.

[3]:
!head -n 2 data/0.json
{"age": 28, "name": ["Reginald", "Rocha"], "occupation": "Amusement Arcade Worker", "telephone": "+1-(975)-805-0646", "address": {"address": "412 Pollock Stravenue", "city": "Temple"}, "credit-card": {"number": "5447 7039 8783 0808", "expiration-date": "11/23"}}
{"age": 58, "name": ["Emelda", "Cash"], "occupation": "Shoe Maker", "telephone": "1-930-868-1341", "address": {"address": "40 Guy Wynd", "city": "Tinley Park"}, "credit-card": {"number": "2565 2262 6349 1676", "expiration-date": "11/16"}}
[4]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b
[4]:
dask.bag<loads, npartitions=10>
[5]:
b.take(2)
[5]:
({'age': 28,
  'name': ['Reginald', 'Rocha'],
  'occupation': 'Amusement Arcade Worker',
  'telephone': '+1-(975)-805-0646',
  'address': {'address': '412 Pollock Stravenue', 'city': 'Temple'},
  'credit-card': {'number': '5447 7039 8783 0808',
   'expiration-date': '11/23'}},
 {'age': 58,
  'name': ['Emelda', 'Cash'],
  'occupation': 'Shoe Maker',
  'telephone': '1-930-868-1341',
  'address': {'address': '40 Guy Wynd', 'city': 'Tinley Park'},
  'credit-card': {'number': '2565 2262 6349 1676',
   'expiration-date': '11/16'}})

Map, Filter, Aggregate

We can process this data by filtering out only certain records of interest, mapping functions over it to process our data, and aggregating those results to a total value.

[6]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30
[6]:
({'age': 58,
  'name': ['Emelda', 'Cash'],
  'occupation': 'Shoe Maker',
  'telephone': '1-930-868-1341',
  'address': {'address': '40 Guy Wynd', 'city': 'Tinley Park'},
  'credit-card': {'number': '2565 2262 6349 1676',
   'expiration-date': '11/16'}},
 {'age': 47,
  'name': ['Porsche', 'Cervantes'],
  'occupation': 'Producer',
  'telephone': '822.066.4939',
  'address': {'address': '1150 Albion Street', 'city': 'Cicero'},
  'credit-card': {'number': '3735 849041 73624', 'expiration-date': '12/21'}})
[7]:
b.map(lambda record: record['occupation']).take(2)  # Select the occupation field
[7]:
('Amusement Arcade Worker', 'Shoe Maker')
[8]:
b.count().compute()  # Count total number of records
[8]:
10000

Chain computations

It is common to do many of these steps in one pipeline, only calling compute or take at the end.

[9]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result
[9]:
dask.bag<topk-aggregate, npartitions=1>

As with all lazy Dask collections, we need to call compute to actually evaluate our result. The take method used in earlier examples is also like compute and will also trigger computation.

[10]:
result.compute()
[10]:
[('Royal Marine', 16),
 ('Gardener', 15),
 ('Sound Engineer', 15),
 ('Radiologist', 14),
 ('Cabinet Maker', 13),
 ('Florist', 13),
 ('Tennis Coach', 13),
 ('Payroll Clerk', 13),
 ('Patent Attorney', 13),
 ('Playgroup Leader', 13)]

Transform and Store

Sometimes we want to compute aggregations as above, but sometimes we want to store results to disk for future analyses. For that we can use methods like to_textfiles and json.dumps, or we can convert to Dask Dataframes and use their storage systems, which we’ll see more of in the next section.

[11]:
(b.filter(lambda record: record['age'] > 30)  # Select records of interest
  .map(json.dumps)                            # Convert Python objects to text
  .to_textfiles('data/processed.*.json'))     # Write to local disk
[11]:
['/home/travis/build/dask/dask-examples/data/processed.0.json',
 '/home/travis/build/dask/dask-examples/data/processed.1.json',
 '/home/travis/build/dask/dask-examples/data/processed.2.json',
 '/home/travis/build/dask/dask-examples/data/processed.3.json',
 '/home/travis/build/dask/dask-examples/data/processed.4.json',
 '/home/travis/build/dask/dask-examples/data/processed.5.json',
 '/home/travis/build/dask/dask-examples/data/processed.6.json',
 '/home/travis/build/dask/dask-examples/data/processed.7.json',
 '/home/travis/build/dask/dask-examples/data/processed.8.json',
 '/home/travis/build/dask/dask-examples/data/processed.9.json']

Convert to Dask Dataframes

Dask Bags are good for reading in initial data, doing a bit of pre-processing, and then handing off to some other more efficient form like Dask Dataframes. Dask Dataframes use Pandas internally, and so can be much faster on numeric data and also have more complex algorithms.

However, Dask Dataframes also expect data that is organized as flat columns. It does not support nested JSON data very well (Bag is better for this).

Here we make a function to flatten down our nested data structure, map that across our records, and then convert that to a Dask Dataframe.

[12]:
b.take(1)
[12]:
({'age': 28,
  'name': ['Reginald', 'Rocha'],
  'occupation': 'Amusement Arcade Worker',
  'telephone': '+1-(975)-805-0646',
  'address': {'address': '412 Pollock Stravenue', 'city': 'Temple'},
  'credit-card': {'number': '5447 7039 8783 0808',
   'expiration-date': '11/23'}},)
[13]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']
    }

b.map(flatten).take(1)
[13]:
({'age': 28,
  'occupation': 'Amusement Arcade Worker',
  'telephone': '+1-(975)-805-0646',
  'credit-card-number': '5447 7039 8783 0808',
  'credit-card-expiration': '11/23',
  'name': 'Reginald Rocha',
  'street-address': '412 Pollock Stravenue',
  'city': 'Temple'},)
[14]:
df = b.map(flatten).to_dataframe()
df.head()
[14]:
age occupation telephone credit-card-number credit-card-expiration name street-address city
0 28 Amusement Arcade Worker +1-(975)-805-0646 5447 7039 8783 0808 11/23 Reginald Rocha 412 Pollock Stravenue Temple
1 58 Shoe Maker 1-930-868-1341 2565 2262 6349 1676 11/16 Emelda Cash 40 Guy Wynd Tinley Park
2 25 Trade Mark Agent (848) 933-7527 5274 2728 6258 8194 07/19 Edwin Guerrero 920 Sunnydale Green Ansonia
3 47 Producer 822.066.4939 3735 849041 73624 12/21 Porsche Cervantes 1150 Albion Street Cicero
4 53 Show Jumper (370) 876-1903 5528 3301 5970 5790 08/16 Alonzo Owen 1379 Bowling Green Extension Collinsville

We can now perform the same computation as before, but now using Pandas and Dask dataframe.

[15]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()
[15]:
Royal Marine                     16
Gardener                         15
Sound Engineer                   15
Radiologist                      14
Florist                          13
Cabinet Maker                    13
Payroll Clerk                    13
Playgroup Leader                 13
Patent Attorney                  13
Aircraft Maintenance Engineer    13
Name: occupation, dtype: int64

Learn More

You may be interested in the following links: