Create Dask Bags¶
There are several ways to create Dask.bags around your data:
db.from_sequence
¶
You can create a bag from an existing Python iterable:
>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])
You can control the number of partitions into which this data is binned:
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)
This controls the granularity of the parallelism that you expose. By default dask will try to partition your data into about 100 partitions.
IMPORTANT: do not load your data into Python and then load that data into dask.bag. Instead, use dask.bag to load your data. This parallelizes the loading step and reduces inter-worker communication:
>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)
db.read_text
¶
Dask.bag can load data directly from textfiles. You can pass either a single filename, a list of filenames, or a globstring. The resulting bag will have one item per line, one file per partition:
>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')
This handles standard compression libraries like gzip
, bz2
, xz
, or
any easily installed compression library that has a File-like object.
Compression will be inferred by filename extension, or by using the
compression='gzip'
keyword:
>>> b = db.read_text('myfile.*.txt.gz')
The resulting items in the bag are strings. If you have encoded data like JSON then you may want to map a decoding or load function across the bag:
>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)
Or do string munging tasks. For convenience there is a string namespace
attached directly to bags with .str.methodname
:
>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')
db.from_delayed
¶
You can construct a dask bag from dask.delayed values
using the db.from_delayed
function. See
documentation on using dask.delayed with collections
for more information.
Store Dask Bags¶
In Memory¶
You can convert a dask bag to a list or Python iterable by calling compute()
or by converting the object into a list
>>> result = b.compute()
or
>>> result = list(b)
To Textfiles¶
You can convert a dask bag into a sequence of files on disk by calling the
.to_textfiles()
method
-
dask.bag.core.
to_textfiles
(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True)¶ Write bag to disk, one filename per partition, one line per element
Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.
Use a globstring
>>> b.to_textfiles('/path/to/data/*.json.gz')
The * will be replaced by the increasing sequence 1, 2, ...
/path/to/data/0.json.gz /path/to/data/1.json.gz
Use a globstring and a
name_function=
keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices.>>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0) '2015-01-01' >>> name(15) '2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)
/path/to/data/2015-01-01.json.gz /path/to/data/2015-01-02.json.gz ...
You can also provide an explicit list of paths.
>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...] >>> b.to_textfiles(paths)
Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.
Bag Contents: The bag calling
to_textfiles
must be a bag of text strings. For example, a bag of dictionaries could be written to JSON text files by mappingjson.dumps
on to the bag first, and then callingto_textfiles
:>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")
To DataFrames¶
You can convert a dask bag into a dask dataframe and use those storage solutions.
-
Bag.
to_dataframe
(columns=None)¶ Convert Bag to dask.dataframe
Bag should contain tuples, dict records, or scalars.
Index will not be particularly meaningful. Use
reindex
afterwards if necessary.Parameters: columns : pandas.DataFrame or list, optional
If a
pandas.DataFrame
, it should mirror the column names and dtypes of the output dataframe. If a list, it provides the desired column names. If not provided or a list, a single element from the first partition will be computed, triggering a potentially expensive call tocompute
. Providing a list is only useful for selecting subset of columns, to avoid an internal compute call you must provide apandas.DataFrame
as dask requires dtype knowledge ahead of time.Examples
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'balance': 100}, ... {'name': 'Bob', 'balance': 200}, ... {'name': 'Charlie', 'balance': 300}], ... npartitions=2) >>> df = b.to_dataframe()
>>> df.compute() balance name 0 100 Alice 1 200 Bob 0 300 Charlie
To Delayed Values¶
You can convert a dask bag into a list of dask delayed values and custom storage solutions from there.
-
Bag.
to_delayed
()¶ Convert bag to list of dask Delayed
Returns list of Delayed, one per partition.