Skip to content

Commit

Permalink
[Benchmark] Add parquet read benchmark (#1371)
Browse files Browse the repository at this point in the history
Adds new benchmark for parquet read performance using a `LocalCUDACluster`. The user can pass in `--key` and `--secret` options to specify S3 credentials.

E.g.
```
$ python ./local_read_parquet.py --devs 0,1,2,3,4,5,6,7 --filesystem fsspec --type gpu --file-count 48 --aggregate-files

Parquet read benchmark
--------------------------------------------------------------------------------
Path                      | s3://dask-cudf-parquet-testing/dedup_parquet
Columns                   | None
Backend                   | cudf
Filesystem                | fsspec
Blocksize                 | 244.14 MiB
Aggregate files           | True
Row count                 | 372066
Size on disk              | 1.03 GiB
Number of workers         | 8
================================================================================
Wall clock                | Throughput
--------------------------------------------------------------------------------
36.75 s                   | 28.78 MiB/s
21.29 s                   | 49.67 MiB/s
17.91 s                   | 59.05 MiB/s
================================================================================
Throughput                | 41.77 MiB/s +/- 7.81 MiB/s
Bandwidth                 | 0 B/s +/- 0 B/s
Wall clock                | 25.32 s +/- 8.20 s
================================================================================
...
```

**Notes**:
- S3 Performance generally scales with the number of workers (multiplied the number of threads per worker)
- The example shown above was not executed from an EC2 instance
- The example shown above *should* perform better after rapidsai/cudf#16657
- Using `--filesystem arrow` together with `--type gpu` performs well, but depends on rapidsai/cudf#16684

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #1371
  • Loading branch information
rjzamora committed Aug 30, 2024
1 parent b519e39 commit 1cc4d0b
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 39 deletions.
9 changes: 1 addition & 8 deletions dask_cuda/benchmarks/local_cudf_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import dask
import dask.dataframe as dd
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes
from dask.utils import format_bytes

from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
Expand Down Expand Up @@ -260,13 +260,6 @@ def parse_args():
"type": str,
"help": "Do shuffle with GPU or CPU dataframes (default 'gpu')",
},
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
{
"name": "--runs",
"default": 3,
Expand Down
9 changes: 1 addition & 8 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import dask
import dask.dataframe as dd
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes
from dask.utils import format_bytes

from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
Expand Down Expand Up @@ -335,13 +335,6 @@ def parse_args():
"action": "store_true",
"help": "Use shuffle join (takes precedence over '--broadcast-join').",
},
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
{
"name": "--frac-match",
"default": 0.3,
Expand Down
7 changes: 0 additions & 7 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,6 @@ def parse_args():
"type": str,
"help": "Do shuffle with GPU or CPU dataframes (default 'gpu')",
},
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
{
"name": "--runs",
"default": 3,
Expand Down
9 changes: 1 addition & 8 deletions dask_cuda/benchmarks/local_cupy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from dask import array as da
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes
from dask.utils import format_bytes

from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
Expand Down Expand Up @@ -297,13 +297,6 @@ def parse_args():
"type": int,
"help": "Chunk size (default 2500).",
},
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB').",
},
{
"name": "--runs",
"default": 3,
Expand Down
9 changes: 1 addition & 8 deletions dask_cuda/benchmarks/local_cupy_map_overlap.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from dask import array as da
from dask.distributed import performance_report, wait
from dask.utils import format_bytes, parse_bytes
from dask.utils import format_bytes

from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
Expand Down Expand Up @@ -168,13 +168,6 @@ def parse_args():
"type": int,
"help": "Kernel size, 2*k+1, in each dimension (default 1)",
},
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
{
"name": "--runs",
"default": 3,
Expand Down
268 changes: 268 additions & 0 deletions dask_cuda/benchmarks/read_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
import contextlib
from collections import ChainMap
from time import perf_counter as clock

import fsspec
import pandas as pd

import dask
import dask.dataframe as dd
from dask.base import tokenize
from dask.distributed import performance_report
from dask.utils import format_bytes, parse_bytes

from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.utils import (
parse_benchmark_args,
print_key_value,
print_separator,
print_throughput_bandwidth,
)

DISK_SIZE_CACHE = {}
OPTIONS_CACHE = {}


def _noop(df):
return df


def read_data(paths, columns, backend, **kwargs):
with dask.config.set({"dataframe.backend": backend}):
return dd.read_parquet(
paths,
columns=columns,
**kwargs,
)


def get_fs_paths_kwargs(args):
kwargs = {}

storage_options = {}
if args.key:
storage_options["key"] = args.key
if args.secret:
storage_options["secret"] = args.secret

if args.filesystem == "arrow":
import pyarrow.fs as pa_fs
from fsspec.implementations.arrow import ArrowFSWrapper

_mapping = {
"key": "access_key",
"secret": "secret_key",
} # See: pyarrow.fs.S3FileSystem docs
s3_args = {}
for k, v in storage_options.items():
s3_args[_mapping[k]] = v

fs = pa_fs.FileSystem.from_uri(args.path)[0]
try:
region = {"region": fs.region}
except AttributeError:
region = {}
kwargs["filesystem"] = type(fs)(**region, **s3_args)
fsspec_fs = ArrowFSWrapper(kwargs["filesystem"])

if args.type == "gpu":
kwargs["blocksize"] = args.blocksize
else:
fsspec_fs = fsspec.core.get_fs_token_paths(
args.path, mode="rb", storage_options=storage_options
)[0]
kwargs["filesystem"] = fsspec_fs
kwargs["blocksize"] = args.blocksize
kwargs["aggregate_files"] = args.aggregate_files

# Collect list of paths
stripped_url_path = fsspec_fs._strip_protocol(args.path)
if stripped_url_path.endswith("/"):
stripped_url_path = stripped_url_path[:-1]
paths = fsspec_fs.glob(f"{stripped_url_path}/*.parquet")
if args.file_count:
paths = paths[: args.file_count]

return fsspec_fs, paths, kwargs


def bench_once(client, args, write_profile=None):
global OPTIONS_CACHE
global DISK_SIZE_CACHE

# Construct kwargs
token = tokenize(args)
try:
fsspec_fs, paths, kwargs = OPTIONS_CACHE[token]
except KeyError:
fsspec_fs, paths, kwargs = get_fs_paths_kwargs(args)
OPTIONS_CACHE[token] = (fsspec_fs, paths, kwargs)

if write_profile is None:
ctx = contextlib.nullcontext()
else:
ctx = performance_report(filename=args.profile)

with ctx:
t1 = clock()
df = read_data(
paths,
columns=args.columns,
backend="cudf" if args.type == "gpu" else "pandas",
**kwargs,
)
num_rows = len(
# Use opaque `map_partitions` call to "block"
# dask-expr from using pq metadata to get length
df.map_partitions(
_noop,
meta=df._meta,
enforce_metadata=False,
)
)
t2 = clock()

# Extract total size of files on disk
token = tokenize(paths)
try:
disk_size = DISK_SIZE_CACHE[token]
except KeyError:
disk_size = sum(fsspec_fs.sizes(paths))
DISK_SIZE_CACHE[token] = disk_size

return (disk_size, num_rows, t2 - t1)


def pretty_print_results(args, address_to_index, p2p_bw, results):
if args.markdown:
print("```")
print("Parquet read benchmark")
data_processed, row_count, durations = zip(*results)
print_separator(separator="-")
backend = "cudf" if args.type == "gpu" else "pandas"
print_key_value(key="Path", value=args.path)
print_key_value(key="Columns", value=f"{args.columns}")
print_key_value(key="Backend", value=f"{backend}")
print_key_value(key="Filesystem", value=f"{args.filesystem}")
print_key_value(key="Blocksize", value=f"{format_bytes(args.blocksize)}")
print_key_value(key="Aggregate files", value=f"{args.aggregate_files}")
print_key_value(key="Row count", value=f"{row_count[0]}")
print_key_value(key="Size on disk", value=f"{format_bytes(data_processed[0])}")
if args.markdown:
print("\n```")
args.no_show_p2p_bandwidth = True
print_throughput_bandwidth(
args, durations, data_processed, p2p_bw, address_to_index
)
print_separator(separator="=")


def create_tidy_results(args, p2p_bw, results):
configuration = {
"path": args.path,
"columns": args.columns,
"backend": "cudf" if args.type == "gpu" else "pandas",
"filesystem": args.filesystem,
"blocksize": args.blocksize,
"aggregate_files": args.aggregate_files,
}
timing_data = pd.DataFrame(
[
pd.Series(
data=ChainMap(
configuration,
{
"wallclock": duration,
"data_processed": data_processed,
"num_rows": num_rows,
},
)
)
for data_processed, num_rows, duration in results
]
)
return timing_data, p2p_bw


def parse_args():
special_args = [
{
"name": "path",
"type": str,
"help": "Parquet directory to read from (must be a flat directory).",
},
{
"name": "--blocksize",
"default": "256MB",
"type": parse_bytes,
"help": "How to set the blocksize option",
},
{
"name": "--aggregate-files",
"default": False,
"action": "store_true",
"help": "How to set the aggregate_files option",
},
{
"name": "--file-count",
"type": int,
"help": "Maximum number of files to read.",
},
{
"name": "--columns",
"type": str,
"help": "Columns to read/select from data.",
},
{
"name": "--key",
"type": str,
"help": "Public S3 key.",
},
{
"name": "--secret",
"type": str,
"help": "Secret S3 key.",
},
{
"name": [
"-t",
"--type",
],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Use GPU or CPU dataframes (default 'gpu')",
},
{
"name": "--filesystem",
"choices": ["arrow", "fsspec"],
"default": "fsspec",
"type": str,
"help": "Filesystem backend",
},
{
"name": "--runs",
"default": 3,
"type": int,
"help": "Number of runs",
},
]

args = parse_benchmark_args(
description="Parquet read benchmark",
args_list=special_args,
check_explicit_comms=False,
)
args.no_show_p2p_bandwidth = True
return args


if __name__ == "__main__":
execute_benchmark(
Config(
args=parse_args(),
bench_once=bench_once,
create_tidy_results=create_tidy_results,
pretty_print_results=pretty_print_results,
)
)
7 changes: 7 additions & 0 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ def parse_benchmark_args(
"If the files already exist, new files are created with a uniquified "
"BASENAME.",
)
parser.add_argument(
"--ignore-size",
default="1 MiB",
metavar="nbytes",
type=parse_bytes,
help="Bandwidth statistics: ignore messages smaller than this (default '1 MB')",
)

for args in args_list:
name = args.pop("name")
Expand Down

0 comments on commit 1cc4d0b

Please sign in to comment.