{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Best Practices" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "This section is a summary of the [official Dask Best Practices](https://docs.dask.org/en/stable/best-practices.html)." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Dashboard" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "The [Dask dashboard](https://docs.dask.org/en/stable/dashboard.html) is a great tool to debug and monitor applications." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2023-02-14 19:34:14,653 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-rghli8_x', purging\n", "2023-02-14 19:34:14,653 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-oex76xjk', purging\n", "2023-02-14 19:34:14,653 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-0308cuzr', purging\n", "2023-02-14 19:34:14,654 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-vkz9v6i0', purging\n", "2023-02-14 19:34:14,654 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-lcamozfp', purging\n", "2023-02-14 19:34:14,654 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-0jaeddsa', purging\n" ] }, { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-78f40d7b-acc8-11ed-a95a-00155df81054

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: distributed.LocalCluster
\n", " Dashboard: http://127.0.0.1:8787/status\n", "
\n", "\n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

LocalCluster

\n", "

282d9198

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", " \n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Workers: 6\n", "
\n", " Total threads: 24\n", " \n", " Total memory: 31.32 GiB\n", "
Status: runningUsing processes: True
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-ba7d69ef-b9a5-4c5a-8913-47a0fbf9e729

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://127.0.0.1:44263\n", " \n", " Workers: 6\n", "
\n", " Dashboard: http://127.0.0.1:8787/status\n", " \n", " Total threads: 24\n", "
\n", " Started: Just now\n", " \n", " Total memory: 31.32 GiB\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 0

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:44785\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:33163/status\n", " \n", " Memory: 5.22 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:38107\n", "
\n", " Local directory: /tmp/dask-worker-space/worker-_c8x9oc0\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 1

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:36109\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:37257/status\n", " \n", " Memory: 5.22 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:35249\n", "
\n", " Local directory: /tmp/dask-worker-space/worker-wwflm5fb\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 2

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:42539\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:37003/status\n", " \n", " Memory: 5.22 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:40833\n", "
\n", " Local directory: /tmp/dask-worker-space/worker-achbs4g5\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 3

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:38295\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:42121/status\n", " \n", " Memory: 5.22 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:36989\n", "
\n", " Local directory: /tmp/dask-worker-space/worker-tp8ybf1y\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 4

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:34687\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:34207/status\n", " \n", " Memory: 5.22 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:42351\n", "
\n", " Local directory: /tmp/dask-worker-space/worker-lx14z7jh\n", "
\n", "
\n", "
\n", "
\n", " \n", "
\n", "
\n", "
\n", "
\n", " \n", "

Worker: 5

\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "\n", " \n", "\n", "
\n", " Comm: tcp://127.0.0.1:40915\n", " \n", " Total threads: 4\n", "
\n", " Dashboard: http://127.0.0.1:42191/status\n", " \n", " Memory: 5.22 GiB\n", "
\n", " Nanny: tcp://127.0.0.1:43815\n", "
\n", " Local directory: /tmp/dask-worker-space/worker-ywd6a1_z\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "client = Client() # start distributed scheduler locally.\n", "client" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 1, 2, 3, ..., 999998, 999999, 1000000])" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import random\n", "import time\n", "\n", "import dask.array as da\n", "\n", "def f(x):\n", " time.sleep(random.random())\n", " return x+1\n", " \n", "\n", "arr = da.arange(1_000_000, chunks=1000)\n", "arr.map_blocks(f).compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Handling Computation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Stop Using Dask When No Longer Needed" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Many workloads read large amount of data, reduce it down, then iterate it.\n", "
For the last step, it makes sense to stop using Dask and use normal Python again." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import dask\n", "\n", "df = dask.datasets.timeseries() # Read data\n", "df = df.groupby(\"name\").mean() # Reduce data significantly\n", "df = df.compute() # Continue with pandas/Numpy" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Avoid Calling `compute` Repeatedly" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Compute related results with shared computations in a single `dask.compute()` call\n", "\n", "```python\n", "# Don't repeatedly call compute\n", "\n", "df = dd.read_csv(\"...\")\n", "xmin = df.x.min().compute()\n", "xmax = df.x.max().compute()\n", "```\n", "\n", "```python\n", "# Do compute multiple results at the same time\n", "\n", "df = dd.read_csv(\"...\")\n", "\n", "xmin, xmax = dask.compute(df.x.min(), df.x.max())\n", "```\n", "This allows Dask to compute the shared parts of the computation (like the `dd.read_csv` call above) only once, rather than once per `compute` call.\n", "\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Fusing Custom Computations" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "While the high level Dask collections (array, DataFrame, bag) offers common operations, they might not accommodate more complex operations.\n", "
Dask offers multiple low-level functions to handle complex workflows.\n", "\n", "All collections have a `map_partitions` or `map_blocks` function, which apply a custom function to each partitions/blocks.\n", "\n", "|**Function**|**Description**|\n", "|:-----------|:--------------|\n", "|`map_partitions`(func, *args[, meta, ...])|Apply Python function on each DataFrame partition. ([documentation](https://docs.dask.org/en/stable/generated/dask.dataframe.map_partitions.html#dask.dataframe.map_partitions))|\n", "|`rolling.map_overlap`(func, df, before, after, ...)|Apply a function to each partition, sharing rows with adjacent partitions. ([documentation](https://docs.dask.org/en/stable/generated/dask.dataframe.rolling.map_overlap.html#dask.dataframe.rolling.map_overlap))|\n", "|`groupby.Aggregation`(name, chunk, agg[, finalize])|User defined groupby-aggregation. ([documentation](https://docs.dask.org/en/stable/generated/dask.dataframe.groupby.Aggregation.html#dask.dataframe.groupby.Aggregation))|\n", "|`blockwise`(func, out_ind, *args[, name, ...])|Tensor operation: Generalized inner and outer products ([documentation](https://docs.dask.org/en/stable/generated/dask.array.blockwise.html#dask.array.blockwise))|\n", "|`map_blocks`(func, *args[, name, token, ...])|Map a function across all blocks of a dask array. ([documentation](https://docs.dask.org/en/stable/generated/dask.array.map_blocks.html#dask.array.map_blocks))|\n", "|`map_overlap`(func, *args[, depth, boundary, ...])|Map a function over blocks of arrays with some overlap ([documentation](https://docs.dask.org/en/stable/generated/dask.array.map_overlap.html#dask.array.map_overlap))|\n", "|`reduction`(x, chunk, aggregate[, axis, ...])|General version of reductions ([documentation](https://docs.dask.org/en/stable/generated/dask.array.reduction.html#dask.array.reduction))|\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
nameidxy
npartitions=30
2000-01-01objectint64float64float64
2000-01-02............
...............
2000-01-30............
2000-01-31............
\n", "
\n", "
Dask Name: make-timeseries, 1 graph layer
" ], "text/plain": [ "Dask DataFrame Structure:\n", " name id x y\n", "npartitions=30 \n", "2000-01-01 object int64 float64 float64\n", "2000-01-02 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-30 ... ... ... ...\n", "2000-01-31 ... ... ... ...\n", "Dask Name: make-timeseries, 1 graph layer" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask\n", "\n", "df = dask.datasets.timeseries()\n", "df" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
name
Edith24989279.2518.55231768.150218
Hannah25055749.50119.33519519.475616
Oliver24773609.75-18.26240220.378895
Sarah24910480.50-8.92135112.455201
Victor24973653.0038.03646238.836490
\n", "
" ], "text/plain": [ " id x y\n", "name \n", "Edith 24989279.25 18.552317 68.150218\n", "Hannah 25055749.50 119.335195 19.475616\n", "Oliver 24773609.75 -18.262402 20.378895\n", "Sarah 24910480.50 -8.921351 12.455201\n", "Victor 24973653.00 38.036462 38.836490" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask.dataframe as dd\n", "\n", "custom_mean = dd.Aggregation(\n", " name=\"custom_mean\",\n", " chunk=lambda c: (c.count(), c.sum()),\n", " agg=lambda count, sum_: (count.count(), sum_.sum()),\n", " finalize=lambda count, sum_: sum_ / count,\n", ")\n", "\n", "df.groupby(\"name\").agg(custom_mean).head()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Handling Data" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Loading Data" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Dask should be used to create large object from data. It is a common anti-pattern to create those large Python objects outside of Dask.\n", "\n", "\n", "DataFrames:\n", "```python\n", "# Don't\n", "\n", "ddf = ... a dask dataframe ...\n", "for fn in filenames:\n", " df = pandas.read_csv(fn) # Read locally with pandas\n", " ddf = ddf.append(df) # Give to Dask\n", "\n", "```\n", "\n", "```python\n", "# Do\n", "\n", "ddf = dd.read_csv(filenames)\n", "```\n", "\n", "Arrays:\n", "```python\n", "# Don't\n", "\n", "f = h5py.File(...)\n", "x = np.asarray(f[\"x\"]) # Get data as a NumPy array locally\n", "\n", "x = da.from_array(x) # Hand NumPy array to Dask\n", "```\n", "\n", "```python\n", "# Do\n", "\n", "f = h5py.File(...)\n", "x = da.from_array(f[\"x\"]) # Let Dask do the reading\n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Persisting Data" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Accessing data from RAM is much faster than from storage. You should persist clean data that both:\n", "- Fits in memory\n", "- Will be used for many analysis" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(,)" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask\n", "\n", "# Read your data\n", "df = dask.datasets.timeseries()\n", "\n", "# Apply some filter\n", "df = df[df[\"name\"] != \"Alice\"]\n", "\n", "# Persist the costly aggregation.\n", "by_name = df.groupby(\"name\")\n", "dask.persist(by_name)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
name
Bob31.5836480.5776020.579210
Charlie31.6223930.5774090.575748
Dan31.6085070.5771930.578075
Edith31.6766650.5771420.577230
Frank31.5610950.5770490.577963
\n", "
" ], "text/plain": [ " id x y\n", "name \n", "Bob 31.583648 0.577602 0.579210\n", "Charlie 31.622393 0.577409 0.575748\n", "Dan 31.608507 0.577193 0.578075\n", "Edith 31.676665 0.577142 0.577230\n", "Frank 31.561095 0.577049 0.577963" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "by_name.sum().head()\n", "by_name.mean().head()\n", "by_name.std().head()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Scatter Data" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Sometimes, an objects needs to be available to most workers to perform a computation.\n", "
You can scatter (or broadcast) the data on the cluster.\n", "\n", "For example, this is useful when joining a small dataset to a larger one. Without scattering the data, you would have to shuffle the larger dataset across the cluster; a costly operation.\n", "\n", "Smaller broadcast variable:\n", "```python\n", "def func(partition, d):\n", " return ...\n", "\n", "my_dict = {...}\n", "\n", "b = b.map(func, d=my_dict)\n", "```\n", "\n", "Large broadcast variable:\n", "```python\n", "my_dict = dask.delayed(my_dict)\n", "\n", "b = b.map(func, d=my_dict)\n", "```\n", "\n", "Very large broadcast variable:\n", "```python\n", "[my_dict] = client.scatter([my_dict])\n", "\n", "b = b.map(func, d=my_dict)\n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Optimized File Format" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Efficient storage formats for different data structures allow faster access and lower storage cost.\n", "
Moreover, these formats often store metadata, which remain accessible even if reading lazily.\n", "\n", "Some example include:\n", "- DataFrames: [Parquet](https://parquet.apache.org/)\n", "- Arrays: [Zarr](https://zarr.readthedocs.io/en/stable/)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Avoid Large Chunks" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask workers will generally have as many chunks in memory as the number of core available. Moreover, it is common for a worker to have 2-3 chunks available so it always has something to work on.\n", "
Keeping that in mind, if your workers have 4 cores and 1GB chunks, the computation will require *at least* 4GB. Therefore, large chunks should be avoided to always keep your worker busy and without running out of memory.\n", "\n", "Very small chunks should also be avoided as discussed below.\n", "\n", "See this blog post on [Choosing good chunk size](https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Avoid Large Graphs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Every operation performed on a chunks will result in a tasks. While the overhead from tasks scheduling is small (usually 200us to 1ms), this can take substantial amount of time for applications with millions of tasks.\n", "\n", "You can build smaller graphs by:\n", "- Increasing your chunk size\n", "- Fusing operation together\n", "- Breaking down your computation" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Processes vs Threads" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "If you're mostly using Numpy, pandas, Scikit-learn, Numba, or other libraries releasing the [GIL](https://docs.python.org/3/glossary.html#term-global-interpreter-lock), then your worker should use threads.\n", "
If you're working with Python object, then processes should be used.\n", "\n", "On larger machines (10+ threads), you should probably have a few processes each with multiple threads. Threading performance is only efficient up to a certain point.\n", "\n", "For information to configure threads, processes, Dask scheduler, see [the scheduler documentation](https://docs.dask.org/en/stable/scheduling.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "big-data-tutorial-101", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "7aed4e4d455db0df64e73da1eb387dfb34ca30e0f855d7b7790daae2643b7068" } } }, "nbformat": 4, "nbformat_minor": 2 }