Create and Store Dask DataFrames¶
Dask can create dataframes objects from various data storage formats. Dask dataframes are commonly used to quickly inspect and analyze large volumes of tabular data stored in CSV, HDF5, or other tabular formats.
See the Overview section for an in depth discussion of dask.dataframe
scope, use, limitations.
From CSV files¶
dask.dataframe.read_csv
uses pandas.read_csv
and so inherits most of
that function’s options. Additionally it gains two new functionalities
- You can provide a globstring
>>> df = dd.read_csv('data.*.csv')
- You can specify the size of each block of data in bytes of uncompressed data. Note that, especially for text data the size on disk may be much less than the number of bytes in memory.
>>> df = dd.read_csv('data.csv', chunkbytes=1000000) # 1MB chunks
For a detailed example of the dask.dataframe.read_csv
method, visit the CSV examples section
From HDF5¶
HDF5 is a Hierarchical Data Format (HDF) designed to store and organize large amounts of data. Similar to the pandas I/O API, dask.dataframe
can create a DataFrame directly from HDF5 datasets. For more detailed examples, visit the HDF5 examples section.
dask.dataframe
can read a single HDF5 (‘myfile.hdf5’) file by referencing a group key (‘/x’):
>>> import dask.dataframe as dd
>>> dd.read_hdf('myfile1.hdf5', '/x', chunksize=1000000)
It is also possible to create a DataFrame object from multiple HDF5 files in a directory with similar group keys by using a wildcard character (*). The dask.dataframe
syntax for this task is:
>>> import dask.dataframe as dd
>>> dd.read_hdf('myfile*.hdf5', '/x', chunksize=1000000)
Finally, dask.dataframe
can load multiple datasets from a single HDF5 file using this syntax:
>>> import dask.dataframe as dd
>>> dd.read_hdf('myfile1.hdf5', '/*', chunksize=1000000)
From an Array¶
You can create a DataFrame from any sliceable array like object including both
NumPy arrays and HDF5 datasets. For a discussion of dask.array
capabilities and
instruction on creating dask arrays, see the Array Overview and Create Dask Arrays sections.
>>> dd.from_array(x, chunksize=1000000)
From BColz¶
BColz is an on-disk, chunked, compressed, column-store. These attributes make
it very attractive for dask.dataframe which can operate particularly well on
it. There is a special from_bcolz
function.
>>> df = dd.from_bcolz('myfile.bcolz', chunksize=1000000)
In particular column access on a dask.dataframe backed by a bcolz.ctable
will only read the necessary columns from disk. This can provide dramatic
performance improvements.
From Bags¶
You can create a dask.dataframe from a dask bag
dask.bag.core.Bag.to_dataframe ([columns]) |
Convert Bag to dask.dataframe |
Using dask.delayed
¶
You can create a plan to arrange many Pandas frames into a sequence with normal
for loops using dask.delayed and then convert these
into a dask dataframe later. See documentation on using dask.delayed with
collections or an example notebook.
Dask.delayed is more useful when simple map
operations aren’t sufficient to
capture the complexity of your data layout.
From Raw Dask Graphs¶
This section is for developer information and discusses internal API. You should never need to create a dataframe object by hand. Instead you should use dask.delayed
To construct a DataFrame manually from a dask graph you need the following information:
- dask: a dask graph with keys like
{(name, 0): ..., (name, 1): ...}
as well as any other tasks on which those tasks depend. The tasks corresponding to(name, i)
should producepandas.DataFrame
objects that correspond to the columns and divisions information discussed below. - name: The special name used above
- columns: A list of column names
- divisions: A list of index values that separate the different partitions.
Alternatively, if you don’t know the divisions (this is common) you can
provide a list of
[None, None, None, ...]
with as many partitions as you have plus one. For more information see the Partitions section in the dataframe documentation.
As an example, we build a DataFrame manually that reads several CSV files that
have a datetime index separated by day. Note, you should never do this. The
dd.read_csv
function does this for you.
dsk = {('mydf', 0): (pd.read_csv, 'data/2000-01-01.csv'),
('mydf', 1): (pd.read_csv, 'data/2000-01-02.csv'),
('mydf', 2): (pd.read_csv, 'data/2000-01-03.csv')}
name = 'mydf'
columns = ['price', 'name', 'id']
divisions = [Timestamp('2000-01-01 00:00:00'),
Timestamp('2000-01-02 00:00:00'),
Timestamp('2000-01-03 00:00:00'),
Timestamp('2000-01-03 23:59:59')]
df = dd.DataFrame(dsk, name, columns, divisions)
Read from Distributed Stores¶
Functions like read_csv
can operate from distributed data stores like the
Hadoop File System and S3. To read from these stores prepend the protocol to
your path:
>>> df = dd.read_csv('/path/to/2015-*-*.csv') # local files
>>> df = dd.read_csv('hdfs:///path/to/2015-*-*.csv') # HDFS
>>> df = dd.read_csv('s3://bucket/key/2015-*-*.csv') # Amazon's S3
These functions are more useful if you are connected to a dask.distributed cluster.
Write DataFrames to Disk¶
The reading functions above typically have writing equivalents. For example
for read_csv
there is also a to_csv
and for read_hdf
there is also
a to_hdf
. Please see the dataframe API for a
complete list. These functions typically follow the Pandas interfaces and
should be familiar to users coming from that tradition.
These functions prefer to receive a globstring with an asterisk '*'
>>> df.to_csv('out.*.csv')
This maximize the throughput during parallel execution and generates a sequential list of files instead of a single file. See the docstrings of these functions for more information on how to tailor the output files.