# Best Practices

This section is a summary of the [official Dask Best Practices](https://docs.dask.org/en/stable/best-practices.html).

# Dashboard

The [Dask dashboard](https://docs.dask.org/en/stable/dashboard.html) is a great tool to debug and monitor applications.

In [1]:
from dask.distributed import Client
client = Client()  # start distributed scheduler locally.
client

2023-02-14 19:34:14,653 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-rghli8_x', purging
2023-02-14 19:34:14,653 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-oex76xjk', purging
2023-02-14 19:34:14,653 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-0308cuzr', purging
2023-02-14 19:34:14,654 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-vkz9v6i0', purging
2023-02-14 19:34:14,654 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-lcamozfp', purging
2023-02-14 19:34:14,654 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-0jaeddsa', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 24,Total memory: 31.32 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:44263,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 24
Started: Just now,Total memory: 31.32 GiB

0,1
Comm: tcp://127.0.0.1:44785,Total threads: 4
Dashboard: http://127.0.0.1:33163/status,Memory: 5.22 GiB
Nanny: tcp://127.0.0.1:38107,
Local directory: /tmp/dask-worker-space/worker-_c8x9oc0,Local directory: /tmp/dask-worker-space/worker-_c8x9oc0

0,1
Comm: tcp://127.0.0.1:36109,Total threads: 4
Dashboard: http://127.0.0.1:37257/status,Memory: 5.22 GiB
Nanny: tcp://127.0.0.1:35249,
Local directory: /tmp/dask-worker-space/worker-wwflm5fb,Local directory: /tmp/dask-worker-space/worker-wwflm5fb

0,1
Comm: tcp://127.0.0.1:42539,Total threads: 4
Dashboard: http://127.0.0.1:37003/status,Memory: 5.22 GiB
Nanny: tcp://127.0.0.1:40833,
Local directory: /tmp/dask-worker-space/worker-achbs4g5,Local directory: /tmp/dask-worker-space/worker-achbs4g5

0,1
Comm: tcp://127.0.0.1:38295,Total threads: 4
Dashboard: http://127.0.0.1:42121/status,Memory: 5.22 GiB
Nanny: tcp://127.0.0.1:36989,
Local directory: /tmp/dask-worker-space/worker-tp8ybf1y,Local directory: /tmp/dask-worker-space/worker-tp8ybf1y

0,1
Comm: tcp://127.0.0.1:34687,Total threads: 4
Dashboard: http://127.0.0.1:34207/status,Memory: 5.22 GiB
Nanny: tcp://127.0.0.1:42351,
Local directory: /tmp/dask-worker-space/worker-lx14z7jh,Local directory: /tmp/dask-worker-space/worker-lx14z7jh

0,1
Comm: tcp://127.0.0.1:40915,Total threads: 4
Dashboard: http://127.0.0.1:42191/status,Memory: 5.22 GiB
Nanny: tcp://127.0.0.1:43815,
Local directory: /tmp/dask-worker-space/worker-ywd6a1_z,Local directory: /tmp/dask-worker-space/worker-ywd6a1_z


In [2]:
import random
import time

import dask.array as da

def f(x):
    time.sleep(random.random())
    return x+1
    

arr = da.arange(1_000_000, chunks=1000)
arr.map_blocks(f).compute()

array([      1,       2,       3, ...,  999998,  999999, 1000000])

# Handling Computation

## Stop Using Dask When No Longer Needed

Many workloads read large amount of data, reduce it down, then iterate it.
</br>For the last step, it makes sense to stop using Dask and use normal Python again.

In [3]:
import dask

df = dask.datasets.timeseries()  # Read data
df = df.groupby("name").mean()   # Reduce data significantly
df = df.compute()                # Continue with pandas/Numpy

## Avoid Calling `compute` Repeatedly

Compute related results with shared computations in a single `dask.compute()` call

```python
# Don't repeatedly call compute

df = dd.read_csv("...")
xmin = df.x.min().compute()
xmax = df.x.max().compute()
```

```python
# Do compute multiple results at the same time

df = dd.read_csv("...")

xmin, xmax = dask.compute(df.x.min(), df.x.max())
```
This allows Dask to compute the shared parts of the computation (like the `dd.read_csv` call above) only once, rather than once per `compute` call.



## Fusing Custom Computations

While the high level Dask collections (array, DataFrame, bag) offers common operations, they might not accommodate more complex operations.
</br>Dask offers multiple low-level functions to handle complex workflows.

All collections have a `map_partitions` or `map_blocks` function, which apply a custom function to each partitions/blocks.

|**Function**|**Description**|
|:-----------|:--------------|
|`map_partitions`(func, *args[, meta, ...])|Apply Python function on each DataFrame partition. ([documentation](https://docs.dask.org/en/stable/generated/dask.dataframe.map_partitions.html#dask.dataframe.map_partitions))|
|`rolling.map_overlap`(func, df, before, after, ...)|Apply a function to each partition, sharing rows with adjacent partitions. ([documentation](https://docs.dask.org/en/stable/generated/dask.dataframe.rolling.map_overlap.html#dask.dataframe.rolling.map_overlap))|
|`groupby.Aggregation`(name, chunk, agg[, finalize])|User defined groupby-aggregation. ([documentation](https://docs.dask.org/en/stable/generated/dask.dataframe.groupby.Aggregation.html#dask.dataframe.groupby.Aggregation))|
|`blockwise`(func, out_ind, *args[, name, ...])|Tensor operation: Generalized inner and outer products ([documentation](https://docs.dask.org/en/stable/generated/dask.array.blockwise.html#dask.array.blockwise))|
|`map_blocks`(func, *args[, name, token, ...])|Map a function across all blocks of a dask array. ([documentation](https://docs.dask.org/en/stable/generated/dask.array.map_blocks.html#dask.array.map_blocks))|
|`map_overlap`(func, *args[, depth, boundary, ...])|Map a function over blocks of arrays with some overlap ([documentation](https://docs.dask.org/en/stable/generated/dask.array.map_overlap.html#dask.array.map_overlap))|
|`reduction`(x, chunk, aggregate[, axis, ...])|General version of reductions ([documentation](https://docs.dask.org/en/stable/generated/dask.array.reduction.html#dask.array.reduction))|


In [4]:
import dask

df = dask.datasets.timeseries()
df

Unnamed: 0_level_0,name,id,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,object,int64,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


In [5]:
import dask.dataframe as dd

custom_mean = dd.Aggregation(
    name="custom_mean",
    chunk=lambda c: (c.count(), c.sum()),
    agg=lambda count, sum_: (count.count(), sum_.sum()),
    finalize=lambda count, sum_: sum_ / count,
)

df.groupby("name").agg(custom_mean).head()

Unnamed: 0_level_0,id,x,y
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Edith,24989279.25,18.552317,68.150218
Hannah,25055749.5,119.335195,19.475616
Oliver,24773609.75,-18.262402,20.378895
Sarah,24910480.5,-8.921351,12.455201
Victor,24973653.0,38.036462,38.83649


# Handling Data

## Loading Data

Dask should be used to create large object from data. It is a common anti-pattern to create those large Python objects outside of Dask.


DataFrames:
```python
# Don't

ddf = ... a dask dataframe ...
for fn in filenames:
    df = pandas.read_csv(fn)  # Read locally with pandas
    ddf = ddf.append(df)            # Give to Dask

```

```python
# Do

ddf = dd.read_csv(filenames)
```

Arrays:
```python
# Don't

f = h5py.File(...)
x = np.asarray(f["x"])  # Get data as a NumPy array locally

x = da.from_array(x)  # Hand NumPy array to Dask
```

```python
# Do

f = h5py.File(...)
x = da.from_array(f["x"])  # Let Dask do the reading
```

## Persisting Data

Accessing data from RAM is much faster than from storage. You should persist clean data that both:
- Fits in memory
- Will be used for many analysis

In [6]:
import dask

# Read your data
df = dask.datasets.timeseries()

# Apply some filter
df = df[df["name"] != "Alice"]

# Persist the costly aggregation.
by_name = df.groupby("name")
dask.persist(by_name)

(<dask.dataframe.groupby.DataFrameGroupBy at 0x7f66b0530940>,)

In [7]:
by_name.sum().head()
by_name.mean().head()
by_name.std().head()

Unnamed: 0_level_0,id,x,y
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Bob,31.583648,0.577602,0.57921
Charlie,31.622393,0.577409,0.575748
Dan,31.608507,0.577193,0.578075
Edith,31.676665,0.577142,0.57723
Frank,31.561095,0.577049,0.577963


## Scatter Data

Sometimes, an objects needs to be available to most workers to perform a computation.
</br>You can scatter (or broadcast) the data on the cluster.

For example, this is useful when joining a small dataset to a larger one. Without scattering the data, you would have to shuffle the larger dataset across the cluster; a costly operation.

Smaller broadcast variable:
```python
def func(partition, d):
    return ...

my_dict = {...}

b = b.map(func, d=my_dict)
```

Large broadcast variable:
```python
my_dict = dask.delayed(my_dict)

b = b.map(func, d=my_dict)
```

Very large broadcast variable:
```python
[my_dict] = client.scatter([my_dict])

b = b.map(func, d=my_dict)
```

## Optimized File Format

Efficient storage formats for different data structures allow faster access and lower storage cost.
</br>Moreover, these formats often store metadata, which remain accessible even if reading lazily.

Some example include:
- DataFrames: [Parquet](https://parquet.apache.org/)
- Arrays: [Zarr](https://zarr.readthedocs.io/en/stable/)

# Avoid Large Chunks

Dask workers will generally have as many chunks in memory as the number of core available. Moreover, it is common for a worker to have 2-3 chunks available so it always has something to work on.
</br>Keeping that in mind, if your workers have 4 cores and 1GB chunks, the computation will require *at least* 4GB. Therefore, large chunks should be avoided to always keep your worker busy and without running out of memory.

Very small chunks should also be avoided as discussed below.

See this blog post on [Choosing good chunk size](https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes).

# Avoid Large Graphs

Every operation performed on a chunks will result in a tasks. While the overhead from tasks scheduling is small (usually 200us to 1ms), this can take substantial amount of time for applications with millions of tasks.

You can build smaller graphs by:
- Increasing your chunk size
- Fusing operation together
- Breaking down your computation

# Processes vs Threads

If you're mostly using Numpy, pandas, Scikit-learn, Numba, or other libraries releasing the [GIL](https://docs.python.org/3/glossary.html#term-global-interpreter-lock), then your worker should use threads.
</br>If you're working with Python object, then processes should be used.

On larger machines (10+ threads), you should probably have a few processes each with multiple threads. Threading performance is only efficient up to a certain point.

For information to configure threads, processes, Dask scheduler, see [the scheduler documentation](https://docs.dask.org/en/stable/scheduling.html).