Supercharge your data processing with DuckDB

Efficient & blazing fast SQL analytics in Pandas with DuckDB
EDA
pandas
SQL
Published

May 27, 2022

Do you have large datasets that you simply can not load into memory to analyse with Pandas? Or do you feel more comfortable expressing operations in SQL instead of python?

Fret not, for you have DuckDB now! ✨🦆✨

Introduction

Most of us data scientists are familiar with Pandas, the de-facto data processing library in Python. It’s simple, powerful, able to handle a myriad data formats & sizes and for the most part, very intuitive. And people familiar with SQL as well know how powerful it too is in its ability to handle complex data transformations with a concise syntax. But sometimes, you have to trade off execution speeds in favor of intuitivity/expressiveness or vice versa. This especially becomes painful when you either have large (OOM) datasets or need to perform complex data wrangling on your data. At times, it makes more sense to handle that in SQL rather than in Pandas.

DuckDB aims to bring these together with a fast & efficient SQL query execution engine that can run complex queries on large sets of data. It integrates tightly with Pandas DataFrames and allows us to run these queries directly on top of them without needing to take data in & out of it.

In this tutorial, we shall see how we can leverage DuckDB in various analytical workloads.

Setup

DuckDB is very lightweight and has no external dependencies and runs within the host process itself. Simply install it with:

pip install duckdb==0.3.4

To initialize it, run:

import duckdb
dbcon = duckdb.connect()

That’s it! Now you can test it by running:

dbcon.execute('select 1, 2, 3').fetchall()
[(1, 2, 3)]

Next step is to run pip install pyarrow to add support for reading/writing parquet data.

Jupyter Notebook setup

If in case you wish to explore it in Jupyter Notebooks, install a few additional libraries for a better experience:

pip install ipython-sql SQLAlchemy duckdb-engine

Import them once installed:

import pandas as pd
import sqlalchemy

%load_ext sql

Set a few config options to prettify the output and return it as Pandas DataFrame

%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False

DuckDB is primarily designed to be an in-memory DB. You can however persist your data to disk.

%sql duckdb:///:memory:
# %sql duckdb:///path/to/file.db
#collapse-output

# we can also access the current config & settings of DuckDB by running the following:
%sql SELECT * FROM duckdb_settings();
name value description input_type
0 access_mode automatic Access mode of the database (AUTOMATIC, READ_O... VARCHAR
1 checkpoint_threshold 16.7MB The WAL size threshold at which to automatical... VARCHAR
2 debug_checkpoint_abort NULL DEBUG SETTING: trigger an abort while checkpoi... VARCHAR
3 debug_force_external False DEBUG SETTING: force out-of-core computation f... BOOLEAN
4 debug_many_free_list_blocks False DEBUG SETTING: add additional blocks to the fr... BOOLEAN
5 debug_window_mode NULL DEBUG SETTING: switch window mode to use VARCHAR
6 default_collation The collation setting used when none is specified VARCHAR
7 default_order asc The order type used when none is specified (AS... VARCHAR
8 default_null_order nulls_first Null ordering used when none is specified (NUL... VARCHAR
9 disabled_optimizers DEBUG SETTING: disable a specific set of optim... VARCHAR
10 enable_external_access True Allow the database to access external state (t... BOOLEAN
11 enable_object_cache False Whether or not object cache is used to cache e... BOOLEAN
12 enable_profiling NULL Enables profiling, and sets the output format ... VARCHAR
13 enable_progress_bar False Enables the progress bar, printing progress to... BOOLEAN
14 explain_output physical_only Output of EXPLAIN statements (ALL, OPTIMIZED_O... VARCHAR
15 force_compression NULL DEBUG SETTING: forces a specific compression m... VARCHAR
16 log_query_path NULL Specifies the path to which queries should be ... VARCHAR
17 max_memory 26.9GB The maximum memory of the system (e.g. 1GB) VARCHAR
18 memory_limit 26.9GB The maximum memory of the system (e.g. 1GB) VARCHAR
19 null_order nulls_first Null ordering used when none is specified (NUL... VARCHAR
20 perfect_ht_threshold 12 Threshold in bytes for when to use a perfect h... BIGINT
21 preserve_identifier_case True Whether or not to preserve the identifier case... BOOLEAN
22 profiler_history_size NULL Sets the profiler history size BIGINT
23 profile_output The file to which profile output should be sav... VARCHAR
24 profiling_mode NULL The profiling mode (STANDARD or DETAILED) VARCHAR
25 profiling_output The file to which profile output should be sav... VARCHAR
26 progress_bar_time 2000 Sets the time (in milliseconds) how long a que... BIGINT
27 schema Sets the default search schema. Equivalent to ... VARCHAR
28 search_path Sets the default search search path as a comma... VARCHAR
29 temp_directory .tmp Set the directory to which to write temp files VARCHAR
30 threads 12 The number of total threads used by the system. BIGINT
31 wal_autocheckpoint 16.7MB The WAL size threshold at which to automatical... VARCHAR
32 worker_threads 12 The number of total threads used by the system. BIGINT
33 binary_as_string In Parquet files, interpret binary data as a s... BOOLEAN
34 Calendar gregorian The current calendar VARCHAR
35 TimeZone Europe/Berlin The current time zone VARCHAR

From now on, you can run SQL directly by prefixing %sql (or %%sql for multiline statements) to your code cell and get your output returned as pandas dataframe :man_dancing:.

%sql select 1 as a;
a
0 1

DuckDB vs traditional Databases

With pandas.read_sql command, one can already run SQL queries on an existing DB connection, read tables and load data as pandas DataFrames in memory for processing in python. While this is fine for lightweight operations, it is not optimized for heavy data processing. Traditional RDBMSs such as Postgres, MySQL, etc. process each row sequentially which apart from taking long time to execute, also induce a lot of overhead on CPU. DuckDB on the other hand is built with OLAP in mind and is Column-Vectorized. This helps massively parallelize disk I/O and query executions.

DuckDB uses the Postgres SQL parser under the hood, and offers many of the same SQL features as Postgres 1

Accessing DataFrames with DuckDB

One of the major limitations of Pandas is that it’s single threaded. Hence we can’t take advantage of the multi-core CPUs which every computer ships with these days. DuckDB supports multi-threading natively and hence can take advantage of all the cores of your CPU.

There exist a few some external libraries that enable parallel processing but only on specific operations.

While this is turned on by default, we also can explicitly set the number of threads:

#collapse-output

# specify no. of threads for multi-threading support
# dbcon.execute("PRAGMA threads=32");

# you can also set this up with the `%sql` line magic
%sql SET THREADS to 32;
Warning

When using free tier of Google Colab, please set it only to 2 to avoid crashing the kernel instead.

We can also write a short snippet to run an SQL query, and return the result as a Pandas DataFrame in a pythonic way:

def dbrun(query: str) -> pd.DataFrame:
    """run the query with duckdb"""

    result = dbcon.query(query).to_df()

    # you can also use `fetch` instead
    # result = dbcon.execute(query).fetch_df()

    # or just simply `.df()`
    # result = dbcon.execute(query).df()

    return result

Now create a DataFrame & run a simple query:

df = pd.DataFrame(range(10), columns=['a'])

# querying with python function
first_half_df = dbrun("""
    SELECT *
    FROM df
    WHERE a < 5
""")

Alternatively, you can also compute the result with the line magics and store the result as a python variable that you can access elsewhere:

#collapse-output

%%sql

second_half_df << 
SELECT *
FROM df
WHERE a >= 5
Returning data to local variable second_half_df

Notice that we didn’t declare df as a view, nor did we create any table. DuckDB simply looks for any python variable that was declared before if the table name doesn’t exist.

Tip

Sometimes, you override a dataframe by performing an operation & assign it to the same variable. For e.g., df = df.query(...). If in case, you see that the duckdb’s df is still not updated, you can (always) ‘register’ the dataframe explicitly as dbcon.register("df", df);

Accessing files with DuckDB

Often times we have data stored either in DB or in parquet/csv formats and they could be huge or may not/barely fit in memory. Loading them with pandas incur significant memory overhead and also possibly creates multiple copies when performing certain operations. With DuckDB, such files can be queried directly without any copying because of direct integration with Apache Arrow.

To demonstrate this, we’ll download an airline delay & cancelation dataset from kaggle (and place it under data/airline_data/ folder). The compressed archive is about ~2GB and when deflated, creates a couple of CSVs totalling ~8GB of data.

Let’s have a quick look at the contents by loading a single csv file:

%%sql

select *
from './data/airline_data/2009.csv'
limit 5
FL_DATE OP_CARRIER OP_CARRIER_FL_NUM ORIGIN DEST CRS_DEP_TIME DEP_TIME DEP_DELAY TAXI_OUT WHEELS_OFF ... CRS_ELAPSED_TIME ACTUAL_ELAPSED_TIME AIR_TIME DISTANCE CARRIER_DELAY WEATHER_DELAY NAS_DELAY SECURITY_DELAY LATE_AIRCRAFT_DELAY Unnamed: 27
0 2009-01-01 XE 1204 DCA EWR 1100 1058.0 -2.0 18.0 1116.0 ... 62.0 68.0 42.0 199.0 None None None None None None
1 2009-01-01 XE 1206 EWR IAD 1510 1509.0 -1.0 28.0 1537.0 ... 82.0 75.0 43.0 213.0 None None None None None None
2 2009-01-01 XE 1207 EWR DCA 1100 1059.0 -1.0 20.0 1119.0 ... 70.0 62.0 36.0 199.0 None None None None None None
3 2009-01-01 XE 1208 DCA EWR 1240 1249.0 9.0 10.0 1259.0 ... 77.0 56.0 37.0 199.0 None None None None None None
4 2009-01-01 XE 1209 IAD EWR 1715 1705.0 -10.0 24.0 1729.0 ... 105.0 77.0 40.0 213.0 None None None None None None

5 rows × 28 columns

we can load efficiently with pandas too by specifying the nrows parameter:

pd.read_csv('./data/airline_data/2009.csv', nrows=5)
FL_DATE OP_CARRIER OP_CARRIER_FL_NUM ORIGIN DEST CRS_DEP_TIME DEP_TIME DEP_DELAY TAXI_OUT WHEELS_OFF ... CRS_ELAPSED_TIME ACTUAL_ELAPSED_TIME AIR_TIME DISTANCE CARRIER_DELAY WEATHER_DELAY NAS_DELAY SECURITY_DELAY LATE_AIRCRAFT_DELAY Unnamed: 27
0 2009-01-01 XE 1204 DCA EWR 1100 1058.0 -2.0 18.0 1116.0 ... 62.0 68.0 42.0 199.0 NaN NaN NaN NaN NaN NaN
1 2009-01-01 XE 1206 EWR IAD 1510 1509.0 -1.0 28.0 1537.0 ... 82.0 75.0 43.0 213.0 NaN NaN NaN NaN NaN NaN
2 2009-01-01 XE 1207 EWR DCA 1100 1059.0 -1.0 20.0 1119.0 ... 70.0 62.0 36.0 199.0 NaN NaN NaN NaN NaN NaN
3 2009-01-01 XE 1208 DCA EWR 1240 1249.0 9.0 10.0 1259.0 ... 77.0 56.0 37.0 199.0 NaN NaN NaN NaN NaN NaN
4 2009-01-01 XE 1209 IAD EWR 1715 1705.0 -10.0 24.0 1729.0 ... 105.0 77.0 40.0 213.0 NaN NaN NaN NaN NaN NaN

5 rows × 28 columns

While the first operation with DuckDB took ~250ms, the second one above took only ~50ms. It’s possibly because such operations are optimized in pandas and also for computing simple metrics across rows (such as mean, min, max etc.), pandas can make use of highly optimized numpy routines under the hood. But, as we’ll see later, these would only go so far.

To load all these CSVs into one dataframe with pandas is very inefficient and takes a lot of time. And it’ll most likely fail if your computer doesn’t have a beefy RAM. If you still gotta do it, you’d have to iterate through all CSVs and concatenate like so:

from glob import glob
csvs = glob('data/airline_data/*.csv')
df = pd.concat(pd.read_csv(csv) for csv in csvs)

A better alternative would be to merge all those CSVs into a single parquet file. Unlike CSVs, parquet files are structured and as such are unambiguous to read.2 They are compressed columnar files and hence take much less space than individual CSVs combined because of the efficient Arrow data format.

%sql copy 'data/airline_data/*.csv' to 'data/airline_data/data.parquet'
Count
0 61556964

Notice that we directly copied all the CSVs using regex to a single parquet file without ever copying data into pandas. Now, we ended up from a 7.5GB combined CSV files to a 2.5GB parquet file which now easily fits into memory. So we can load it with pandas as:

df = pd.read_parquet('data/airline_data/data.parquet')
df.shape
(61556964, 28)

Because of the efficient parquet format, we could load the entire 61 Million rows of data in mere seconds.

Projection & Filter Pushdowns

Now let’s do a simple filter operation on our dataset. Let’s count the total number of rows that satisfy the condition TAXI_OUT > 10. We’ll try with both pandas & duckdb.

df[df['TAXI_OUT'] > 10].shape
(45209245, 28)
%%sql

select count(*) as count
from df
where TAXI_OUT > 10
count
0 45209245

While the earlier operation took ~9.5s, the latter just took ~250ms :zap:. There’s just no comparison.

This is because duckdb automatically optimizes the query by selecting only the required column(s) (aka projection pushdown) and then applies the filtering to get a subset of data (aka filter pushdown). Pandas instead reads through all the columns. We can optimize this in pandas by doing these pushdowns ourselves.

projection_pushdown_df = df[['TAXI_OUT']]
filter_pushdown_df = projection_pushdown_df[projection_pushdown_df['TAXI_OUT'] > 10]
filter_pushdown_df.shape
(45209245, 1)

We managed to bring this down from several seconds to almost a second. But using duckdb is still about 70-90% faster than this.

Using Groupby

Now let’s calculate a few aggregates using groupby with projection & filter pushdowns combined.

Here, we compute a few simple metrics with a certain airline carrier grouped by two origin & destination airports and finally sort the results by the origin airport.

projection_df = df[['ORIGIN', 'DEST', 'TAXI_OUT', 
                    'AIR_TIME', 'DISTANCE', 'OP_CARRIER']]
origin_df = projection_df[
    (projection_df['ORIGIN'].isin(('DCA', 'EWR'))) &
    (projection_df['DEST'].isin(('DCA', 'EWR'))) &
    (projection_df['OP_CARRIER'] == 'XE')]
(origin_df
     .groupby(['ORIGIN', 'DEST'])
     .agg(
         avg_taxi_out=('TAXI_OUT', 'mean'),
         max_air_time=('AIR_TIME', 'max'),
         total_distance=('DISTANCE', 'sum'))
     .sort_index(level=0)
)
avg_taxi_out max_air_time total_distance
ORIGIN DEST
DCA EWR 22.116009 87.0 828835.0
EWR DCA 23.675481 93.0 831024.0

We can make it a bit more concise by using .query for filtering pushdown.

(df
 .query('OP_CARRIER == "XE" and ORIGIN in ("DCA", "EWR") and DEST in ("DCA", "EWR")')
 .groupby(['ORIGIN', 'DEST'])
 .agg(
     avg_taxi_out=('TAXI_OUT', 'mean'),
     max_air_time=('AIR_TIME', 'max'),
     total_distance=('DISTANCE', 'sum'))
)
avg_taxi_out max_air_time total_distance
ORIGIN DEST
DCA EWR 22.116009 87.0 828835.0
EWR DCA 23.675481 93.0 831024.0

This approach took only about half the time (~3s) compared to our earlier one because .query uses a modified syntax of python and also indexing thus resulting in more efficient evaluation. We can now compare that to our SQL counterpart…

%%sql

select
    ORIGIN,
    DEST,
    AVG(TAXI_OUT) as avg_taxi_out,
    MAX(AIR_TIME) as max_air_time,
    SUM(DISTANCE) as total_distance

from df

where
    OP_CARRIER = 'XE' and
    ORIGIN in ('DCA', 'EWR') and
    DEST in ('DCA', 'EWR')
    
group by ORIGIN, DEST
order by ORIGIN
ORIGIN DEST avg_taxi_out max_air_time total_distance
0 DCA EWR 22.116009 87.0 828835.0
1 EWR DCA 23.675481 93.0 831024.0

This ~400ms execution with duckdb above is around an order of magnitude faster and also a lot cleaner, I’d say. :wink:

Notice that the data is already loaded under df and hence we don’t need to read from the source parquet file.

In the same way, we can also improve the performance of our queries drastically when using joins across multiple tables. I leave this as an exercise to the reader.

But why actually load data into memory in the first place when we can process it more efficiently with it being just on disk? Often times, the data is too big to load into memory anyways.

To do that, we just need to create a VIEW to our data which lets us query the table directly without loading onto memory and update the source from the dataframe df to the newly created view instead.3

Accessing data with Views

%%sql

create view airlinedata as select * from 'data/airline_data/data.parquet';

select
    ORIGIN,
    DEST,
    AVG(TAXI_OUT) as avg_taxi_out,
    MAX(AIR_TIME) as max_air_time,
    SUM(DISTANCE) as total_distance

from airlinedata

where
    OP_CARRIER = 'XE' and
    ORIGIN in ('DCA', 'EWR') and
    DEST in ('DCA', 'EWR')

-- you can also use `ALL` instead of each column
group by ALL
order by ORIGIN
ORIGIN DEST avg_taxi_out max_air_time total_distance
0 DCA EWR 22.116009 87.0 828835.0
1 EWR DCA 23.675481 93.0 831024.0

This is such a powerful feature and allows us to explore lot more data than traditionally possible with pandas.

We can also check the datatypes and other metadata with describe view/table name

#collapse-output

%sql describe airlinedata
column_name column_type null key default extra
0 FL_DATE DATE YES None None None
1 OP_CARRIER VARCHAR YES None None None
2 OP_CARRIER_FL_NUM INTEGER YES None None None
3 ORIGIN VARCHAR YES None None None
4 DEST VARCHAR YES None None None
5 CRS_DEP_TIME INTEGER YES None None None
6 DEP_TIME DOUBLE YES None None None
7 DEP_DELAY DOUBLE YES None None None
8 TAXI_OUT DOUBLE YES None None None
9 WHEELS_OFF DOUBLE YES None None None
10 WHEELS_ON DOUBLE YES None None None
11 TAXI_IN DOUBLE YES None None None
12 CRS_ARR_TIME INTEGER YES None None None
13 ARR_TIME DOUBLE YES None None None
14 ARR_DELAY DOUBLE YES None None None
15 CANCELLED DOUBLE YES None None None
16 CANCELLATION_CODE VARCHAR YES None None None
17 DIVERTED DOUBLE YES None None None
18 CRS_ELAPSED_TIME DOUBLE YES None None None
19 ACTUAL_ELAPSED_TIME DOUBLE YES None None None
20 AIR_TIME DOUBLE YES None None None
21 DISTANCE DOUBLE YES None None None
22 CARRIER_DELAY DOUBLE YES None None None
23 WEATHER_DELAY DOUBLE YES None None None
24 NAS_DELAY DOUBLE YES None None None
25 SECURITY_DELAY DOUBLE YES None None None
26 LATE_AIRCRAFT_DELAY DOUBLE YES None None None
27 Unnamed: 27 VARCHAR YES None None None

Using approximations

At times, it suffices just to get an estimate of certain data rather than a precise answer. Using approximations would help us to just that.

%%sql

select
    OP_CARRIER,
    approx_count_distinct(DEST) as approx_num_unique_destinations

from airlinedata

group by 1
order by 1

limit 10
OP_CARRIER approx_num_unique_destinations
0 9E 186
1 AA 116
2 AS 77
3 B6 73
4 CO 85
5 DL 171
6 EV 205
7 F9 130
8 FL 75
9 G4 126
%%sql

select
    OP_CARRIER,
    -- takes more time to compute
    count(distinct DEST) as num_unique_destinations

from airlinedata

group by 1
order by 1

limit 10
OP_CARRIER num_unique_destinations
0 9E 185
1 AA 116
2 AS 77
3 B6 73
4 CO 85
5 DL 170
6 EV 205
7 F9 129
8 FL 75
9 G4 126

Our approximation query earlier ran about 3-4 times faster than the precise one in this case. This is crucial when responsiveness is more important than precision (esp. for larger datasets).

Using Window functions

Finally, let’s wrap our analysis by showing off a bit more of what duckdb can do using some advanced SQL operations.

We create two CTEs (Common Table Expressions) to calculate a couple of features. We do filter & projection pushdowns in one CTE and compute our desired features in another. The first feature is a simple demo to showcase if-else support. The second feature is a bit advanced where we find out the last destination a given air carrier has flown to, sorted by flying date. And when it doesn’t exist, replace it with NA. We then take a sample from the final resultant set.

%%sql

with limited_data as (
    select 
        FL_DATE,
        ORIGIN, 
        DEST, 
        DISTANCE,
        OP_CARRIER,
    from airlinedata
    where FL_DATE >= '2015-01-01'    
),

last_destination_data as (
    select *,
        case
            when DISTANCE*1.60934 > 500 then 'yes'
            else 'no'
        end as distance_more_than_500_km,

        coalesce(last_value(DEST) over (
            partition by OP_CARRIER
            order by FL_DATE
            rows between unbounded preceding and 1 preceding
        ), 'NA') as last_destination_flown_with_this_carrier

    from limited_data
)

select *
from last_destination_data
using sample 10;
FL_DATE ORIGIN DEST DISTANCE OP_CARRIER distance_more_than_500_km last_destination_flown_with_this_carrier
0 2018-07-10 DCA LGA 214.0 YX no DCA
1 2015-05-08 DAL BWI 1209.0 WN yes BWI
2 2018-03-30 LAS SJC 386.0 WN yes SJC
3 2015-07-10 BOS MSP 1124.0 DL yes DTW
4 2016-06-01 DTW BWI 409.0 DL yes DTW
5 2017-07-26 GEG MSP 1175.0 DL yes SAN
6 2017-01-10 DFW ACT 89.0 EV no DFW
7 2015-01-01 PHX DRO 351.0 OO yes BFL
8 2018-05-06 DFW MCO 985.0 AA yes IAH
9 2018-04-30 MSY LAX 1670.0 WN yes LAX

Nice, isn’t it?! The same operation is unimaginably complex (for me, at least) in pandas. 🤯

With DuckDB, we can combine one or more of many of such complex operations and execute in one go without worrying much about manual optimizations.

Conclusion

We have noticed how performant DuckDB is and how it brings the whole SQL ecosystem into Pandas. Its simple installation and light footprint means that we can integrate this into our existing workflows with minimal effort and achieve maximum gains in terms of execution speeds . We can also continue using pandas on larger datasets without loading them into memory or jumping onto a full blown distributed computing setup (for a reasonable extent).

Thanks so much for reading this far :heart:. Now you know most of the important stuff about DuckDB to get yourself started. Bear in mind that we hardly scratched the surface. DuckDB offers so much more. You can use correlated subqueries, nested types, etc. apart from its many user friendly features such as column aliasing in group by/having, auto-incrementing duplicate columns, better string slicing and so on. Their documentation is very clean & beginner friendly and the blog has very informative posts as well. I encourage you to check those out.

Sidenote

  • Modin tries to parallellize pandas workflow by distributing the workload to multiple CPU cores. Have a look and am guessing that we can also combine modin & DuckDB for an even faster runtimes.
  • Fugue is a new framework that provides a unified interface so that users can execute their Python, Pandas, and SQL code on various distributed computing platforms such as Spark and Dask without rewrites. Please check here for a nice introduction from its maintainer Kevin and also checkout this article from Khuyen to get a feel of using it with DuckDB.

I hope you have enjoyed this post and learnt something from it. Please let me know your thoughts/suggestions (or any mistakes) in the comments below. :)

Happy Ducking! 💚🦆💚

Footnotes

Footnotes

  1. SQL on Pandas with duckdb↩︎

  2. duckdb on parquet↩︎

  3. The exact execution times might vary a bit depending on the load & build of your computer. I also noticed that the operations are cached and the first computation takes a bit of time but running it again or after changing the values of the columns in the WHERE clause would only take a couple of ms later on.↩︎