Create and Store Dask DataFrames

Dask can create dataframes objects from various data storage formats. Dask dataframes are commonly used to quickly inspect and analyze large volumes of tabular data stored in CSV, HDF5, or other tabular formats.

See the Overview section for an in depth discussion of dask.dataframe scope, use, limitations.

From CSV files

dask.dataframe.read_csv uses pandas.read_csv and so inherits most of that function’s options. Additionally it gains two new functionalities

  1. You can provide a globstring
>>> df = dd.read_csv('data.*.csv')
  1. You can specify the size of each block of data in bytes of uncompressed data. Note that, especially for text data the size on disk may be much less than the number of bytes in memory.
>>> df = dd.read_csv('data.csv', chunkbytes=1000000)  # 1MB chunks

For a detailed example of the dask.dataframe.read_csv method, visit the CSV examples section

From HDF5

HDF5 is a Hierarchical Data Format (HDF) designed to store and organize large amounts of data. Similar to the pandas I/O API, dask.dataframe can create a DataFrame directly from HDF5 datasets. For more detailed examples, visit the HDF5 examples section.

dask.dataframe can read a single HDF5 (‘myfile.hdf5’) file by referencing a group key (‘/x’):

>>> import dask.dataframe as dd
>>> dd.read_hdf('myfile1.hdf5', '/x', chunksize=1000000)

It is also possible to create a DataFrame object from multiple HDF5 files in a directory with similar group keys by using a wildcard character (*). The dask.dataframe syntax for this task is:

>>> import dask.dataframe as dd
>>> dd.read_hdf('myfile*.hdf5', '/x', chunksize=1000000)

Finally, dask.dataframe can load multiple datasets from a single HDF5 file using this syntax:

>>> import dask.dataframe as dd
>>> dd.read_hdf('myfile1.hdf5', '/*', chunksize=1000000)

From an Array

You can create a DataFrame from any sliceable array like object including both NumPy arrays and HDF5 datasets. For a discussion of dask.array capabilities and instruction on creating dask arrays, see the Array Overview and Create Dask Arrays sections.

>>> dd.from_array(x, chunksize=1000000)

From BColz

BColz is an on-disk, chunked, compressed, column-store. These attributes make it very attractive for dask.dataframe which can operate particularly well on it. There is a special from_bcolz function.

>>> df = dd.from_bcolz('myfile.bcolz', chunksize=1000000)

In particular column access on a dask.dataframe backed by a bcolz.ctable will only read the necessary columns from disk. This can provide dramatic performance improvements.

From Bags

You can create a dask.dataframe from a dask bag

dask.bag.core.Bag.to_dataframe([columns]) Convert Bag to dask.dataframe

Using dask.delayed

You can create a plan to arrange many Pandas frames into a sequence with normal for loops using dask.delayed and then convert these into a dask dataframe later. See documentation on using dask.delayed with collections or an example notebook. Dask.delayed is more useful when simple map operations aren’t sufficient to capture the complexity of your data layout.

From Raw Dask Graphs

This section is for developer information and discusses internal API. You should never need to create a dataframe object by hand. Instead you should use dask.delayed

To construct a DataFrame manually from a dask graph you need the following information:

  1. dask: a dask graph with keys like {(name, 0): ..., (name, 1): ...} as well as any other tasks on which those tasks depend. The tasks corresponding to (name, i) should produce pandas.DataFrame objects that correspond to the columns and divisions information discussed below.
  2. name: The special name used above
  3. columns: A list of column names
  4. divisions: A list of index values that separate the different partitions. Alternatively, if you don’t know the divisions (this is common) you can provide a list of [None, None, None, ...] with as many partitions as you have plus one. For more information see the Partitions section in the dataframe documentation.

As an example, we build a DataFrame manually that reads several CSV files that have a datetime index separated by day. Note, you should never do this. The dd.read_csv function does this for you.

dsk = {('mydf', 0): (pd.read_csv, 'data/2000-01-01.csv'),
       ('mydf', 1): (pd.read_csv, 'data/2000-01-02.csv'),
       ('mydf', 2): (pd.read_csv, 'data/2000-01-03.csv')}
name = 'mydf'
columns = ['price', 'name', 'id']
divisions = [Timestamp('2000-01-01 00:00:00'),
             Timestamp('2000-01-02 00:00:00'),
             Timestamp('2000-01-03 00:00:00'),
             Timestamp('2000-01-03 23:59:59')]

df = dd.DataFrame(dsk, name, columns, divisions)

Read from Distributed Stores

Functions like read_csv can operate from distributed data stores like the Hadoop File System and S3. To read from these stores prepend the protocol to your path:

>>> df = dd.read_csv('/path/to/2015-*-*.csv')  # local files
>>> df = dd.read_csv('hdfs:///path/to/2015-*-*.csv')  # HDFS
>>> df = dd.read_csv('s3://bucket/key/2015-*-*.csv')  # Amazon's S3

These functions are more useful if you are connected to a dask.distributed cluster.

Write DataFrames to Disk

The reading functions above typically have writing equivalents. For example for read_csv there is also a to_csv and for read_hdf there is also a to_hdf. Please see the dataframe API for a complete list. These functions typically follow the Pandas interfaces and should be familiar to users coming from that tradition.

These functions prefer to receive a globstring with an asterisk '*'

>>> df.to_csv('out.*.csv')

This maximize the throughput during parallel execution and generates a sequential list of files instead of a single file. See the docstrings of these functions for more information on how to tailor the output files.