In a recent post titled Working with Large CSV files in Python, I shared an approach I use when I have very large CSV files (and other file types) that are too large to load into memory. While the approach I previously highlighted works well, it can be tedious to first load data into sqllite (or any other database) and then access that database to analyze data. I just found a better approach using Dask.
While looking around the web to learn about some parallel processing capabilities, I ran across a python module named Dask, which describes itself as:
…is a flexible parallel computing library for analytic computing.
When I saw that, I was intrigued. There’s a lot that can be done with that statement and I’ve got plans to introduce Dask into my various tool sets for data analytics.
While reading the docs, I ran across the ‘dataframe‘ concept and immediately new I’d found a new tool for working with large CSV files. With Dask’s dataframe concept, you can do out-of-core analysis (e.g., analyze data in the CSV without loading the entire CSV file into memory). Other than out-of-core manipulation, dask’s dataframe uses the pandas API, which makes things extremely easy for those of us who use and love pandas.
With Dask and its dataframe construct, you set up the dataframe must like you would in pandas but rather than loading the data into pandas, this appraoch keeps the dataframe as a sort of ‘pointer’ to the data file and doesn’t load anything until you specifically tell it to do so.
One note (that I always have to share): If you are planning on working with your data set over time, its probably best to get the data into a database of some type.
An example using Dask and the Dataframe
First, let’s get everything installed. The documentation claims that you just need to install dask, but I had to install ‘toolz’ and ‘cloudpickle’ to get dask’s dataframe to import. To install dask and its requirements, open a terminal and type (you need pip for this):
pip install dask[complete]
NOTE: I mistakenly had “pip install dask” listed initially. This only installs the base dask system and not the dataframe (and other dependancies). Thanks to Kevin for pointing this out.
Now, let’s write some code to load csv data and and start analyzing it. For this example, I’m using the 311 Service Requests dataset from NYC’s Open Data portal. You can download the dataset here: 311 Service Requests – 7Gb+ CSV
Set up your dataframe so you can analyze the 311_Service_Requests.csv file. This file is assumed to be stored in the directory that you are working in.
import dask.dataframe as dd filename = '311_Service_Requests.csv' df = dd.read_csv(filename, dtype='str')
Unlike pandas, the data isn’t read into memory…we’ve just set up the dataframe to be ready to do some compute functions on the data in the csv file using familiar functions from pandas. Note: I used “dtype=’str'” in the read_csv to get around some strange formatting issues in this particular file.
Let’s take a look at the first few rows of the file using pandas’ head() call. When you run this, the first X rows (however many rows you are looking at with head(X)) and then displays those rows.
df.head(2)
Note: a small subset of the columns are shown below for simplicity
Unique Key | Created Date | Closed Date | Agency | |
---|---|---|---|---|
25513481 | 05/09/2013 12:00:00 AM | 05/14/2013 12:00:00 AM | HPD | |
25513482 | 05/09/2013 12:00:00 AM | 05/13/2013 12:00:00 AM | HPD | |
25513483 | 05/09/2013 12:00:00 AM | 05/22/2013 12:00:00 AM | HPD | |
25513484 | 05/09/2013 12:00:00 AM | 05/12/2013 12:00:00 AM | HPD | |
25513485 | 05/09/2013 12:00:00 AM | 05/11/2013 12:00:00 AM | HPD |
We see that there’s some spaces in the column names. Let’s remove those spaces to make things easier to work with.
df = df.rename(columns={c: c.replace(' ', '') for c in df.columns})
The cool thing about dask is that you can do things like renaming columns without loading all the data into memory.
There’s a column in this data called ‘Descriptor’ that has the problem types, and “radiator” is one of those problem types. Let’s take a look at how many service requests were because of some problem with a radiator. To do this, you can filter the dataframe using standard pandas filtering (see below) to create a new dataframe.
# create a new dataframe with only 'RADIATOR' service calls radiator_df=df[df.Descriptor=='RADIATOR']
Let’s see how many rows we have using the ‘count’ command
radiator_df.Descriptor.count()
You’ll notice that when you run the above command, you don’t actually get count returned. You get a descriptor back similar like “dd.Scalar<series-…, dtype=int64>”
To actually compute the count, you have to call “compute” to get dask to run through the dataframe and count the number of records.
radiator_df.compute()
When you run this command, you should get something like the following
[52077 rows x 52 columns]
The above are just some samples for using dask’s dataframe construct. Remember, we built a new dataframe using pandas’ filters without loading the entire original data set into memory. They may not seem like much, but when working with a 7Gb+ file, you can save a great deal of time and effort using dask when compared to using the approach I previously mentioned.
Dask seems to have a ton of other great features that I’ll be diving into at some point in the near future, but for now, the dataframe construct has been an awesome find.
I was confused, too, when
import dask.dataframe as dd
didn’t just work after I saw somebody do it in a conference video.It’s actually mentioned in the documentation: you need to do
pip install dask[dataframe]
to get those extra dependencies, like toolz and cloudpickle.Hey Kevin –
You are correct!
I missed that. When I installed dask, I actually did installed dask[complete]. Thanks for the catch. I’m going to edit the post with a note of the mistake.
why we need to install twice or they did this based on what?
You only install it once using
pip install dask[complete]
Unfortunately, dask doesn’t work with gzip files.
Warning gzip compression does not support breaking apart files
Please ensure that each individual file can fit in memory and
use the keyword “blocksize=None to remove this message
Setting `blocksize=None“
Good to know. Thanks!
Unfortunately, dask doesn’t work with missing values!
for example, if one columns of df has some Na values then you could’t compute mean!
You should be able to use Dask’s
DataFrameGroupBy.mean()
to do this…it excludes missing dataCan I use dask to compare 2 data frames like source and target data
using pandas i was getting memory issues with 8GB laptop Windows 10 Python 64 bit
requirement is to compare 2 dataframes once read from DB or CSV
You can. There are plenty of examples out there on the web on how to do that (I don’t have any examples at the moment)
I like dask. But can’t seem to get around the bottleneck using compute(). I need to create a thousand sums. But using dask is significantly slower than Pandas. Where can I find information on optimizing dask?
That is one of the downsides of Dask. Here’s a good place to start http://dask.pydata.org/en/latest/optimize.html
Just discovered Dask. I have a large (8GB) csv which I am trying to sort by two columns, which Dask does not support. Is there a work around using maybe groupby and apply?
I bet you could do a groupby and shuffle (or something similar) to do what you need. I’ve never actually done it but worth a shot.http://dask.pydata.org/en/latest/dataframe-groupby.html
Thanks on great work!
I am entirely new to python and ML, could you please guide me with my use case.
I have a large input file ~ 12GB, I want to run certain checks/validations like, count, distinct columns, column type , and so on. could you please suggest my on using dask and pandas , may be reading the file in chunks and aggregating. And this input file is not predefined.
Everything you are wanting to do is easily accessible via Dask and Pandas. I’d suggest reading the documentation for both.
i keep getting kernel died after running .compute. Is it due to memory?
If your output dataset is larger than available memory it will do this. See here for an example https://github.com/dask/dask/issues/2585
Thank you for this post.
I am interested if dask is/gets able to handle the following issue: Reading massively large datasets (>>memory) stored in multiple (hdf5) files and downsample (e.g. LTTB,…) them for plotting with low time costs. Maybe in combination with holoviews!?
One workaround for homogenous data is described here, but not very intuitive to me: https://github.com/pyviz/datashader/issues/560
Do you have any experience in that?
Hi Franz – that’s a great question. I don’t have any experience with that but will keep on the lookout for any information for you.
Thanks for the above article. I want to know how to handle NA values in dask array and dask dataframe.
It’s probably best to check out the Dask issues on github and ask over there https://github.com/dask/dask/issues