4.3. Best Practices#

This section is a summary of the official Dask Best Practices.

4.4. Dashboard#

The Dask dashboard is a great tool to debug and monitor applications.

from dask.distributed import Client
client = Client()  # start distributed scheduler locally.
client

Client

Client-1fb24e69-acd0-11ed-8986-23ef2bd9ee33

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

import random
import time

import dask.array as da

def f(x):
    time.sleep(random.random())
    return x+1
    

arr = da.arange(1_000_000, chunks=1000)
arr.map_blocks(f).compute()
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[2], line 12
      8     return x+1
     11 arr = da.arange(1_000_000, chunks=1000)
---> 12 arr.map_blocks(f).compute()

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
    290 def compute(self, **kwargs):
    291     """Compute this dask collection
    292 
    293     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    312     dask.base.compute
    313     """
--> 314     (result,) = compute(self, traverse=False, **kwargs)
    315     return result

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/client.py:3137, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3135         should_rejoin = False
   3136 try:
-> 3137     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3138 finally:
   3139     for f in futures.values():

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/client.py:2306, in Client.gather(self, futures, errors, direct, asynchronous)
   2304 else:
   2305     local_worker = None
-> 2306 return self.sync(
   2307     self._gather,
   2308     futures,
   2309     errors=errors,
   2310     direct=direct,
   2311     local_worker=local_worker,
   2312     asynchronous=asynchronous,
   2313 )

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/utils.py:338, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    336     return future
    337 else:
--> 338     return sync(
    339         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    340     )

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/utils.py:401, in sync(loop, func, callback_timeout, *args, **kwargs)
    399 else:
    400     while not e.is_set():
--> 401         wait(10)
    403 if error:
    404     typ, exc, tb = error

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/utils.py:390, in sync.<locals>.wait(timeout)
    388 def wait(timeout):
    389     try:
--> 390         return e.wait(timeout)
    391     except KeyboardInterrupt:
    392         loop.add_callback(cancel)

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/threading.py:607, in Event.wait(self, timeout)
    605 signaled = self._flag
    606 if not signaled:
--> 607     signaled = self._cond.wait(timeout)
    608 return signaled

File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/threading.py:324, in Condition.wait(self, timeout)
    322 else:
    323     if timeout > 0:
--> 324         gotit = waiter.acquire(True, timeout)
    325     else:
    326         gotit = waiter.acquire(False)

KeyboardInterrupt: 

4.5. Handling Computation#

4.5.1. Stop Using Dask When No Longer Needed#

Many workloads read large amount of data, reduce it down, then iterate it.
For the last step, it makes sense to stop using Dask and use normal Python again.

import dask

df = dask.datasets.timeseries()  # Read data
df = df.groupby("name").mean()   # Reduce data significantly
df = df.compute()                # Continue with pandas/Numpy

4.5.2. Avoid Calling compute Repeatedly#

Compute related results with shared computations in a single dask.compute() call

# Don't repeatedly call compute

df = dd.read_csv("...")
xmin = df.x.min().compute()
xmax = df.x.max().compute()
# Do compute multiple results at the same time

df = dd.read_csv("...")

xmin, xmax = dask.compute(df.x.min(), df.x.max())

This allows Dask to compute the shared parts of the computation (like the dd.read_csv call above) only once, rather than once per compute call.

4.5.3. Fusing Custom Computations#

While the high level Dask collections (array, DataFrame, bag) offers common operations, they might not accommodate more complex operations.
Dask offers multiple low-level functions to handle complex workflows.

All collections have a map_partitions or map_blocks function, which apply a custom function to each partitions/blocks.

Function

Description

map_partitions(func, *args[, meta, …])

Apply Python function on each DataFrame partition. (documentation)

rolling.map_overlap(func, df, before, after, …)

Apply a function to each partition, sharing rows with adjacent partitions. (documentation)

groupby.Aggregation(name, chunk, agg[, finalize])

User defined groupby-aggregation. (documentation)

blockwise(func, out_ind, *args[, name, …])

Tensor operation: Generalized inner and outer products (documentation)

map_blocks(func, *args[, name, token, …])

Map a function across all blocks of a dask array. (documentation)

map_overlap(func, *args[, depth, boundary, …])

Map a function over blocks of arrays with some overlap (documentation)

reduction(x, chunk, aggregate[, axis, …])

General version of reductions (documentation)

import dask

df = dask.datasets.timeseries()
df
Dask DataFrame Structure:
name id x y
npartitions=30
2000-01-01 object int64 float64 float64
2000-01-02 ... ... ... ...
... ... ... ... ...
2000-01-30 ... ... ... ...
2000-01-31 ... ... ... ...
Dask Name: make-timeseries, 1 graph layer
import dask.dataframe as dd

custom_mean = dd.Aggregation(
    name="custom_mean",
    chunk=lambda c: (c.count(), c.sum()),
    agg=lambda count, sum_: (count.count(), sum_.sum()),
    finalize=lambda count, sum_: sum_ / count,
)

df.groupby("name").agg(custom_mean).head()
id x y
name
Edith 24989279.25 18.552317 68.150218
Hannah 25055749.50 119.335195 19.475616
Oliver 24773609.75 -18.262402 20.378895
Sarah 24910480.50 -8.921351 12.455201
Victor 24973653.00 38.036462 38.836490

4.6. Handling Data#

4.6.1. Loading Data#

Dask should be used to create large object from data. It is a common anti-pattern to create those large Python objects outside of Dask.

DataFrames:

# Don't

ddf = ... a dask dataframe ...
for fn in filenames:
    df = pandas.read_csv(fn)  # Read locally with pandas
    ddf = ddf.append(df)            # Give to Dask

# Do

ddf = dd.read_csv(filenames)

Arrays:

# Don't

f = h5py.File(...)
x = np.asarray(f["x"])  # Get data as a NumPy array locally

x = da.from_array(x)  # Hand NumPy array to Dask
# Do

f = h5py.File(...)
x = da.from_array(f["x"])  # Let Dask do the reading

4.6.2. Persisting Data#

Accessing data from RAM is much faster than from storage. You should persist clean data that both:

  • Fits in memory

  • Will be used for many analysis

import dask

# Read your data
df = dask.datasets.timeseries()

# Apply some filter
df = df[df["name"] != "Alice"]

# Persist the costly aggregation.
by_name = df.groupby("name")
dask.persist(by_name)
(<dask.dataframe.groupby.DataFrameGroupBy at 0x7f66b0530940>,)
by_name.sum().head()
by_name.mean().head()
by_name.std().head()
id x y
name
Bob 31.583648 0.577602 0.579210
Charlie 31.622393 0.577409 0.575748
Dan 31.608507 0.577193 0.578075
Edith 31.676665 0.577142 0.577230
Frank 31.561095 0.577049 0.577963

4.6.3. Scatter Data#

Sometimes, an objects needs to be available to most workers to perform a computation.
You can scatter (or broadcast) the data on the cluster.

For example, this is useful when joining a small dataset to a larger one. Without scattering the data, you would have to shuffle the larger dataset across the cluster; a costly operation.

Smaller broadcast variable:

def func(partition, d):
    return ...

my_dict = {...}

b = b.map(func, d=my_dict)

Large broadcast variable:

my_dict = dask.delayed(my_dict)

b = b.map(func, d=my_dict)

Very large broadcast variable:

[my_dict] = client.scatter([my_dict])

b = b.map(func, d=my_dict)

4.6.4. Optimized File Format#

Efficient storage formats for different data structures allow faster access and lower storage cost.
Moreover, these formats often store metadata, which remain accessible even if reading lazily.

Some example include:

4.7. Avoid Large Chunks#

Dask workers will generally have as many chunks in memory as the number of core available. Moreover, it is common for a worker to have 2-3 chunks available so it always has something to work on.
Keeping that in mind, if your workers have 4 cores and 1GB chunks, the computation will require at least 4GB. Therefore, large chunks should be avoided to always keep your worker busy and without running out of memory.

Very small chunks should also be avoided as discussed below.

See this blog post on Choosing good chunk size.

4.8. Avoid Large Graphs#

Every operation performed on a chunks will result in a tasks. While the overhead from tasks scheduling is small (usually 200us to 1ms), this can take substantial amount of time for applications with millions of tasks.

You can build smaller graphs by:

  • Increasing your chunk size

  • Fusing operation together

  • Breaking down your computation

4.9. Processes vs Threads#

If you’re mostly using Numpy, pandas, Scikit-learn, Numba, or other libraries releasing the GIL, then your worker should use threads.
If you’re working with Python object, then processes should be used.

On larger machines (10+ threads), you should probably have a few processes each with multiple threads. Threading performance is only efficient up to a certain point.

For information to configure threads, processes, Dask scheduler, see the scheduler documentation.