Best Practices
Contents
4.3. Best Practices#
This section is a summary of the official Dask Best Practices.
4.4. Dashboard#
The Dask dashboard is a great tool to debug and monitor applications.
from dask.distributed import Client
client = Client() # start distributed scheduler locally.
client
Client
Client-1fb24e69-acd0-11ed-8986-23ef2bd9ee33
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
f5d923e0
Dashboard: http://127.0.0.1:8787/status | Workers: 2 |
Total threads: 2 | Total memory: 6.78 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-bea7a7bc-57c6-42b8-9ad3-a4953dc67b79
Comm: tcp://127.0.0.1:36299 | Workers: 2 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 2 |
Started: Just now | Total memory: 6.78 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:36881 | Total threads: 1 |
Dashboard: http://127.0.0.1:38245/status | Memory: 3.39 GiB |
Nanny: tcp://127.0.0.1:39195 | |
Local directory: /tmp/dask-worker-space/worker-ozoxcf3n |
Worker: 1
Comm: tcp://127.0.0.1:44953 | Total threads: 1 |
Dashboard: http://127.0.0.1:41807/status | Memory: 3.39 GiB |
Nanny: tcp://127.0.0.1:38687 | |
Local directory: /tmp/dask-worker-space/worker-24tsgzfy |
import random
import time
import dask.array as da
def f(x):
time.sleep(random.random())
return x+1
arr = da.arange(1_000_000, chunks=1000)
arr.map_blocks(f).compute()
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
Cell In[2], line 12
8 return x+1
11 arr = da.arange(1_000_000, chunks=1000)
---> 12 arr.map_blocks(f).compute()
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
290 def compute(self, **kwargs):
291 """Compute this dask collection
292
293 This turns a lazy Dask collection into its in-memory equivalent.
(...)
312 dask.base.compute
313 """
--> 314 (result,) = compute(self, traverse=False, **kwargs)
315 return result
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
596 keys.append(x.__dask_keys__())
597 postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/client.py:3137, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3135 should_rejoin = False
3136 try:
-> 3137 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3138 finally:
3139 for f in futures.values():
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/client.py:2306, in Client.gather(self, futures, errors, direct, asynchronous)
2304 else:
2305 local_worker = None
-> 2306 return self.sync(
2307 self._gather,
2308 futures,
2309 errors=errors,
2310 direct=direct,
2311 local_worker=local_worker,
2312 asynchronous=asynchronous,
2313 )
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/utils.py:338, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
336 return future
337 else:
--> 338 return sync(
339 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
340 )
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/utils.py:401, in sync(loop, func, callback_timeout, *args, **kwargs)
399 else:
400 while not e.is_set():
--> 401 wait(10)
403 if error:
404 typ, exc, tb = error
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/site-packages/distributed/utils.py:390, in sync.<locals>.wait(timeout)
388 def wait(timeout):
389 try:
--> 390 return e.wait(timeout)
391 except KeyboardInterrupt:
392 loop.add_callback(cancel)
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/threading.py:607, in Event.wait(self, timeout)
605 signaled = self._flag
606 if not signaled:
--> 607 signaled = self._cond.wait(timeout)
608 return signaled
File /opt/hostedtoolcache/Python/3.10.9/x64/lib/python3.10/threading.py:324, in Condition.wait(self, timeout)
322 else:
323 if timeout > 0:
--> 324 gotit = waiter.acquire(True, timeout)
325 else:
326 gotit = waiter.acquire(False)
KeyboardInterrupt:
4.5. Handling Computation#
4.5.1. Stop Using Dask When No Longer Needed#
Many workloads read large amount of data, reduce it down, then iterate it. For the last step, it makes sense to stop using Dask and use normal Python again.
import dask
df = dask.datasets.timeseries() # Read data
df = df.groupby("name").mean() # Reduce data significantly
df = df.compute() # Continue with pandas/Numpy
4.5.2. Avoid Calling compute
Repeatedly#
Compute related results with shared computations in a single dask.compute()
call
# Don't repeatedly call compute
df = dd.read_csv("...")
xmin = df.x.min().compute()
xmax = df.x.max().compute()
# Do compute multiple results at the same time
df = dd.read_csv("...")
xmin, xmax = dask.compute(df.x.min(), df.x.max())
This allows Dask to compute the shared parts of the computation (like the dd.read_csv
call above) only once, rather than once per compute
call.
4.5.3. Fusing Custom Computations#
While the high level Dask collections (array, DataFrame, bag) offers common operations, they might not accommodate more complex operations. Dask offers multiple low-level functions to handle complex workflows.
All collections have a map_partitions
or map_blocks
function, which apply a custom function to each partitions/blocks.
Function |
Description |
---|---|
|
Apply Python function on each DataFrame partition. (documentation) |
|
Apply a function to each partition, sharing rows with adjacent partitions. (documentation) |
|
User defined groupby-aggregation. (documentation) |
|
Tensor operation: Generalized inner and outer products (documentation) |
|
Map a function across all blocks of a dask array. (documentation) |
|
Map a function over blocks of arrays with some overlap (documentation) |
|
General version of reductions (documentation) |
import dask
df = dask.datasets.timeseries()
df
name | id | x | y | |
---|---|---|---|---|
npartitions=30 | ||||
2000-01-01 | object | int64 | float64 | float64 |
2000-01-02 | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2000-01-30 | ... | ... | ... | ... |
2000-01-31 | ... | ... | ... | ... |
import dask.dataframe as dd
custom_mean = dd.Aggregation(
name="custom_mean",
chunk=lambda c: (c.count(), c.sum()),
agg=lambda count, sum_: (count.count(), sum_.sum()),
finalize=lambda count, sum_: sum_ / count,
)
df.groupby("name").agg(custom_mean).head()
id | x | y | |
---|---|---|---|
name | |||
Edith | 24989279.25 | 18.552317 | 68.150218 |
Hannah | 25055749.50 | 119.335195 | 19.475616 |
Oliver | 24773609.75 | -18.262402 | 20.378895 |
Sarah | 24910480.50 | -8.921351 | 12.455201 |
Victor | 24973653.00 | 38.036462 | 38.836490 |
4.6. Handling Data#
4.6.1. Loading Data#
Dask should be used to create large object from data. It is a common anti-pattern to create those large Python objects outside of Dask.
DataFrames:
# Don't
ddf = ... a dask dataframe ...
for fn in filenames:
df = pandas.read_csv(fn) # Read locally with pandas
ddf = ddf.append(df) # Give to Dask
# Do
ddf = dd.read_csv(filenames)
Arrays:
# Don't
f = h5py.File(...)
x = np.asarray(f["x"]) # Get data as a NumPy array locally
x = da.from_array(x) # Hand NumPy array to Dask
# Do
f = h5py.File(...)
x = da.from_array(f["x"]) # Let Dask do the reading
4.6.2. Persisting Data#
Accessing data from RAM is much faster than from storage. You should persist clean data that both:
Fits in memory
Will be used for many analysis
import dask
# Read your data
df = dask.datasets.timeseries()
# Apply some filter
df = df[df["name"] != "Alice"]
# Persist the costly aggregation.
by_name = df.groupby("name")
dask.persist(by_name)
(<dask.dataframe.groupby.DataFrameGroupBy at 0x7f66b0530940>,)
by_name.sum().head()
by_name.mean().head()
by_name.std().head()
id | x | y | |
---|---|---|---|
name | |||
Bob | 31.583648 | 0.577602 | 0.579210 |
Charlie | 31.622393 | 0.577409 | 0.575748 |
Dan | 31.608507 | 0.577193 | 0.578075 |
Edith | 31.676665 | 0.577142 | 0.577230 |
Frank | 31.561095 | 0.577049 | 0.577963 |
4.6.3. Scatter Data#
Sometimes, an objects needs to be available to most workers to perform a computation. You can scatter (or broadcast) the data on the cluster.
For example, this is useful when joining a small dataset to a larger one. Without scattering the data, you would have to shuffle the larger dataset across the cluster; a costly operation.
Smaller broadcast variable:
def func(partition, d):
return ...
my_dict = {...}
b = b.map(func, d=my_dict)
Large broadcast variable:
my_dict = dask.delayed(my_dict)
b = b.map(func, d=my_dict)
Very large broadcast variable:
[my_dict] = client.scatter([my_dict])
b = b.map(func, d=my_dict)
4.6.4. Optimized File Format#
Efficient storage formats for different data structures allow faster access and lower storage cost. Moreover, these formats often store metadata, which remain accessible even if reading lazily.
Some example include:
4.7. Avoid Large Chunks#
Dask workers will generally have as many chunks in memory as the number of core available. Moreover, it is common for a worker to have 2-3 chunks available so it always has something to work on. Keeping that in mind, if your workers have 4 cores and 1GB chunks, the computation will require at least 4GB. Therefore, large chunks should be avoided to always keep your worker busy and without running out of memory.
Very small chunks should also be avoided as discussed below.
See this blog post on Choosing good chunk size.
4.8. Avoid Large Graphs#
Every operation performed on a chunks will result in a tasks. While the overhead from tasks scheduling is small (usually 200us to 1ms), this can take substantial amount of time for applications with millions of tasks.
You can build smaller graphs by:
Increasing your chunk size
Fusing operation together
Breaking down your computation
4.9. Processes vs Threads#
If you’re mostly using Numpy, pandas, Scikit-learn, Numba, or other libraries releasing the GIL, then your worker should use threads. If you’re working with Python object, then processes should be used.
On larger machines (10+ threads), you should probably have a few processes each with multiple threads. Threading performance is only efficient up to a certain point.
For information to configure threads, processes, Dask scheduler, see the scheduler documentation.