Home

Published

- 15 min read

Data ingestion with dlt Workshop

img of Data ingestion with dlt Workshop

Data ingestion with dlt(data load tool) Workshop

Introduction

  • Data ingestion is the process of extracting data from a producer, transporting it to a convenient environment, and preparing it for usage by normalising it, sometimes cleaning, and adding metadata.
  • Here’s what you need to learn to build pipelines
    • Extracting data
    • Normalising, cleaning, adding metadata such as schema and types
    • and Incremental loading, which is vital for fast, cost effective data refreshes.

What is dlt?

dlt is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets. To get started, install it with:

pip install dlt

  • Unlike other solutions, with dlt, there’s no need to use any backends or containers. Simply import dlt in a Python file or a Jupyter Notebook cell, and create a pipeline to load data into any of the supported destinations. You can load data from any source that produces Python data structures, including APIs, files, databases, and more.
  • The library will create or update tables, infer data types and handle nested data automatically. Here are a few example pipelines:
    • Data from an API
    • Data from a dlt Source
    • Data from CSV/XLS/Pandas
    • Data from a Database

To make it work with the DuckDB destination, you’ll need to install the duckdb dependency (the default dlt installation is really minimal):

pip install “dlt[duckdb]”

Why use dlt?​
  • Automated maintenance - with schema inference and evolution and alerts, and with short declarative code, maintenance becomes simple.
  • Run it where Python runs - on Airflow, serverless functions, notebooks. No external APIs, backends or containers, scales on micro and large infra alike.
  • User-friendly, declarative interface that removes knowledge obstacles for beginners while empowering senior professionals.
Benefits: As a data engineer, dlt offers several benefits:
  • Efficient Data Extraction and Loading: dlt simplifies the process of extracting and loading data. It allows you to decorate your data-producing functions with loading or incremental extraction metadata, enabling dlt to extract and load data according to your custom logic. This is particularly useful when dealing with large datasets, as dlt supports scalability through iterators, chunking, and parallelization.
  • Automated Schema Management: dlt automatically infers a schema from data and loads the data to the destination. It can easily adapt and structure data as it evolves, reducing the time spent on maintenance and development. This ensures data consistency and quality. Read more
  • Data Governance Support: dlt pipelines offer robust governance support through pipeline metadata utilization, schema enforcement and curation, and schema change alerts. This promotes data consistency, traceability, and control throughout the data processing lifecycle.
  • Flexibility and Scalability: dlt can be used on Airflow, serverless functions, notebooks, and scales on both micro and large infrastructures. It also offers several mechanisms and configuration options to scale up and fine-tune pipelines.
  • Post-Loading Transformations: dlt provides several options for transformations after loading the data, including using dbt, the dlt SQL client, or Pandas. This allows you to shape and manipulate the data before or after loading it, allowing you to meet specific requirements and ensure data quality and consistency.

Part 1: Data Extraction

Most data is stored behind an API

  • Sometimes that’s a RESTful api for some business application, returning records of data.
  • Sometimes the API returns a secure file path to something like a json or parquet file in a bucket that enables you to grab the data in bulk,
  • Sometimes the API is something else (mongo, sql, other databases or applications) and will generally return records as JSON - the most common interchange format.

As an engineer, you will need to build pipelines that “just work”.

So here’s what you need to consider on extraction, to prevent the pipelines from breaking, and to keep them running smoothly.

  • Hardware limits: the challenges of managing memory.
  • Network limits: Sometimes networks can fail. We can’t fix what could go wrong but we can retry network jobs until they succeed. For example, dlt library offers a requests “replacement” that has built in retries.
  • Source api limits: Each source might have some limits such as how many requests you can do per second. We would call these “rate limits”. Read each source’s docs carefully to understand how to navigate these obstacles.

Managing memory

  • Many data pipelines run on serverless functions or on orchestrators that delegate the workloads to clusters of small workers.
  • These systems have a small memory or share it between multiple workers - so filling the memory is BAAAD: It might lead to not only your pipeline crashing, but crashing the entire container or machine that might be shared with other worker processes, taking them down too.
  • The same can be said about disk - in most cases your disk is sufficient, but in some cases it’s not. For those cases, mounting an external drive mapped to a storage bucket is the way to go.

So how do we avoid filling the memory?

  • We often do not know the volume of data upfront
  • And we cannot scale dynamically or infinitely on hardware during runtime
  • So the answer is: Control the max memory you use

Control the max memory used by streaming the data

Streaming here refers to processing the data event by event or chunk by chunk instead of doing bulk operations. some classic examples of streaming where data is transferred chunk by chunk or event by event :

insta-counter

  • Between instagram live and your followers
  • Between a server and a local video player
  • Between an audio broadcaster and an in-browser audio player
  • Between google maps and your navigation app
Streaming in python via generators

In Python, a generator is a function that returns an iterator that produces a sequence of values when iterated over. Generators are useful when we want to produce a large sequence of values, but we don’t want to store all of them in memory at once.

  • To process data in a stream in python, we use generators, which are functions that can return multiple times - by allowing multiple returns, the data can be released as it’s produced, as stream, instead of returning it all at once as a batch.

Regular function collects data in memory. Here you can see how data is collected row by row in a list called databefore it is returned. This will break if we have more data than memory.

   def search_twitter(query):
	data = []
	for row in paginated_get(query):
		data.append(row)
	return data

# Collect all the cat picture data
for row in search_twitter("cat pictures"):
  # Once collected,
  # print row by row
	print(row)

When calling for row in search_twitter(“cat pictures”): all the data must first be downloaded before the first record is returned

Generator for streaming the data. The memory usage here is minimal.

As you can see, in the modified function, we yield each row as we get the data, without collecting it into memory. We can then run this generator and handle the data item by item.

   def search_twitter(query):
	for row in paginated_get(query):
		yield row

# Get one row at a time
for row in extract_data("cat pictures"):
	# print the row
	print(row)
  # do something with the row such as cleaning it and writing it to a buffer
	# continue requesting and printing data

When calling for row in extract_data(“cat pictures”): the function only runs until the first data item is yielded, before printing - so we do not need to wait long for the first value. It will then continue until there is no more data to get.

If we wanted to get all the values at once from a generator instead of one by one, we would need to first “run” the generator and collect the data. For example, if we wanted to get all the data in memory we could do data = list(extract_data(“cat pictures”)) which would run the generator and collect all the data in a list before continuing.

We used 3 ways to download the same data : -

  • Extracting API data with a generator
  • The “bad” way to download a file
  • Extracting file data with a generator “the best practice way”

Let’s use the fast and reliable way to load some data and inspect it in DuckDB. We are using dlt library to do the loading, which will process data from the generators incrementally, following the same memory management paradigm.

LOAD
   import dlt

# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
generators_pipeline = dlt.pipeline(destination = 'duckdb', dataset_name = 'generators')


# we can load any generator to a table at the pipeline destnation as follows:
info = generators_pipeline.run(paginated_getter(),
  table_name = "http_download",
  write_disposition = "replace")

# the outcome metadata is returned by the load and we can inspect it by printing it.
  print(info)

# we can load the next generator to the same or to a different table.
  info = generators_pipeline.run(stream_download_jsonl(url),
    table_name = "stream_download",
    write_disposition = "replace")

print(info)
   Pipeline dlt_colab_kernel_launcher load step completed in 1.50 seconds
1 load package(s) were loaded to destination duckdb and into dataset generators
The duckdb destination used duckdb:////content/dlt_colab_kernel_launcher.duckdb location to store data
Load package 1707149342.0762298 is LOADED and contains no failed jobs
Pipeline dlt_colab_kernel_launcher load step completed in 1.44 seconds
1 load package(s) were loaded to destination duckdb and into dataset generators
The duckdb destination used duckdb:////content/dlt_colab_kernel_launcher.duckdb location to store data
Load package 1707149369.8628578 is LOADED and contains no failed jobs
QUERY THE DATA
   # show outcome
import duckdb
conn = duckdb.connect(f"{generators_pipeline.pipeline_name}.duckdb")
# let's see the tables
conn.sql(f"SET search_path = '{ generators_pipeline.dataset_name } '")
print('Loaded tables: ')
display(conn.sql("show tables"))
# and the data
print("\n\n\n http_download table below:")
rides = conn.sql("SELECT * FROM http_download").df()display(rides)
print("\n\n\n stream_download table below:")
passengers = conn.sql("SELECT * FROM stream_download").df()display(passengers)
# As you can see, the same data was loaded in both cases.
SAMPLE OUTPUT
   Loaded tables:
┌─────────────────────┐
name
varchar
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ http_download       │
│ stream_download     │
└─────────────────────┘

 http_download table below:
end_lat	end_lon	fare_amt	passenger_count	payment_type	start_lat	start_lon	tip_amt	tolls_amt	total_amt	trip_distance	trip_dropoff_date_time	trip_pickup_date_time	surcharge	vendor_name	_dlt_load_id	_dlt_id	store_and_forward
0	40.742963	-73.980072	45.0	1	Credit	40.641525	-73.787442	9.0	4.15	58.15	17.52	2009-06-14 23:48:00+00:00	2009-06-14 23:23:00+00:00	0.0	VTS	1707149342.0762298	TMFOk/TVI/OneQ	NaN
1	40.740187	-74.005698	6.5	1	Credit	40.722065	-74.009767	1.0	0.00	8.50	1.56	2009-06-18 17:43:00+00:00	2009-06-18 17:35:00+00:00	1.0	VTS	1707149342.0762298	SJ8vsEQWR9NsvA	NaN
2	40.718043	-74.004745	12.5	5	Credit	40.761945	-73.983038	2.0	0.00	15.50	3.37	2009-06-10 18:27:00+00:00	2009-06-10 18:08:00+00:00	1.0	VTS	1707149342.0762298	ECr1xFFMENZ49g	NaN
3	40.739637	-73.985233	4.9	1	CASH	40.749802	-73.992247	0.0	0.00	5.40	1.11	2009-06-14 23:58:00+00:00	2009-06-14 23:54:00+00:00	0.5	VTS	1707149342.0762298	WJ16OJ7NvsTDLA	NaN
..............................

stream_download table below:
vendor_name	trip_pickup_date_time	trip_dropoff_date_time	passenger_count	trip_distance	start_lon	start_lat	end_lon	end_lat	payment_type	fare_amt	surcharge	tip_amt	tolls_amt	total_amt	_dlt_load_id	_dlt_id	store_and_forward
0	VTS	2009-06-14 23:23:00+00:00	2009-06-14 23:48:00+00:00	1	17.52	-73.787442	40.641525	-73.980072	40.742963	Credit	45.0	0.0	9.0	4.15	58.15	1707149369.8628578	JnbfJ0jwLQC7Bg	NaN
1	VTS	2009-06-18 17:35:00+00:00	2009-06-18 17:43:00+00:00	1	1.56	-74.009767	40.722065	-74.005698	40.740187	Credit	6.5	1.0	1.0	0.00	8.50	1707149369.8628578	GbdfwgfDuU8oVA	NaN
2	VTS	2009-06-10 18:08:00+00:00	2009-06-10 18:27:00+00:00	5	3.37	-73.983038	40.761945	-74.004745	40.718043	Credit	12.5	1.0	2.0	0.00	15.50	1707149369.8628578	t+dAetvDHmQ8Ow	NaN
3	VTS	2009-06-14 23:54:00+00:00	2009-06-14 23:58:00+00:00	1	1.11	-73.992247	40.749802	-73.985233	40.739637	CASH	4.9	0.5	0.0	0.00	5.40	1707149369.8628578	wXYEl9kyoOtX7w	NaN
..............................
dlt Verified Sources
  • Airtable ,Amazon Kinesis,Arrow Table / Pandas ,Asana ,Chess.com ,Facebook Ads , Filesystem,GitHub , Google Analytics ,Google Sheets ,Hubspot,Mail Inbox , Jira ,Confluent Kafka ,Matomo, MongoDB,Mux,Notion,Personio , Pipedrive ,Salesforce,Shopify,30+ SQL Databases,Slack ,Strapi API , Stripe API , Workable ,Zendesk API.
dlt Destinations
  • Google BigQuery, Databricks, DuckDB, Microsoft SQL Server, Azure Synapse, Filesystem & buckets,Postgres, Amazon Redshift, Snowflake, AWS Athena / Glue Catalog, 🧪 MotherDuck, Weaviate, Qdrant.

Part 2: Normalization

Let’s look granularly into what people consider data cleaning.

Usually we have 2 parts:

  • Normalising data without changing its meaning,
  • and filtering data for a use case, which changes its meaning.

Part of what we often call data cleaning is just metadata work:

  • Add types (string to number, string to timestamp, etc)
  • Rename columns: Ensure column names follow a supported standard downstream - such as no strange characters in the names.
  • Flatten nested dictionaries: Bring nested dictionary values into the top dictionary row
  • Unnest lists or arrays into child tables: Arrays or lists cannot be flattened into their parent record, so if we want flat data we need to break them out into separate tables.

Why prepare data? why not use json as is?

  • We do not easily know what is inside a json document due to lack of schema
  • Types are not enforced between rows of json - we could have one record where age is 25and another where age is twenty five , and another where it’s 25.00. Or in some systems, you might have a dictionary for a single record, but a list of dicts for multiple records. This could easily lead to applications downstream breaking.
  • We cannot just use json data easily, for example we would need to convert strings to time if we want to do a daily aggregation.
  • Reading json loads more data into memory, as the whole document is scanned - while in parquet or databases we can scan a single column of a document. This causes costs and slowness.
  • Json is not fast to aggregate - columnar formats are.
  • Json is not fast to search.
  • Basically json is designed as a “lowest common denominator format” for “interchange” / data transfer and is unsuitable for direct analytical usage.

when going from JSON to DB, we want some things standardised:

  • Data types such as timestamps should be detected correctly
  • Column names should be converted to db-compatible names
  • Unnested sub-tables should be linked to parent tables via auto generated keys

When converting nested data to tabular formats, to keep fragmentations minimal:

  • Nested dictionaries can be flattened into the parent row
  • Nested lists however need to be expressed as separate tables due to the different granularity (1:n relationship)

nested dictionaries

   "coordinates": {
            "start": {
                "lon": -73.787442,
                "lat": 40.641525                },

nested lists

   "passengers": [
            {"name": "John", "rating": 4.9},
            {"name": "Jack", "rating": 3.9}
              ],

dlt automates much of the tedious work a data engineer would do, and does it in a way that is robust. dlt can handle things like:

  • Schema: Inferring and evolving schema, alerting changes, using schemas as data contracts.
  • Typing data, flattening structures, renaming columns to fit database standards. In our example we will pass the “data” you can see above and see it normalised.
  • Processing a stream of events/rows without filling memory. This includes extraction from generators.
  • Loading to a variety of dbs or file formats.

Let’s look at what happened during the load

  • By looking at the loaded tables, we can see our json document got flattened and sub-documents got split into separate tables .
  • We can re-join those child tables to the parent table by using the generated keys
   on parent_table._dlt_id = child_table._dlt_parent_id
  • Data types: the timestamps, which in json are of string type, are now of timestamp type in the db.
   joined = conn.sql("""
SELECT *
  FROM rides as r
left join rides__passengers as rp
  on r._dlt_id = rp._dlt_parent_id
left join rides__stops as rs
  on r._dlt_id = rs._dlt_parent_id
""").df()
display(joined)


joined table
record_hash	vendor_name	time__pickup	time__dropoff	trip_distance	coordinates__start__lon	coordinates__start__lat	coordinates__end__lon	coordinates__end__lat	payment__type	...	_dlt_root_id	_dlt_parent_id	_dlt_list_idx	_dlt_id_2	lon	lat	_dlt_root_id_2	_dlt_parent_id_2	_dlt_list_idx_2	_dlt_id_3
0	b00361a396177a9cb410ff61f20015ad	VTS	2009-06-14 23:23:00+00:00	2009-06-14 23:48:00+00:00	17.52	-73.787442	40.641525	-73.980072	40.742963	Credit	...	NxIknICvlm3HXg	NxIknICvlm3HXg	1	xPwvjKWL3RFfuQ	-73.5	40.5	NxIknICvlm3HXg	NxIknICvlm3HXg	1	on55pVMrw4OKVA
1	b00361a396177a9cb410ff61f20015ad	VTS	2009-06-14 23:23:00+00:00	2009-06-14 23:48:00+00:00	17.52	-73.787442	40.641525	-73.980072	40.742963	Credit	...	NxIknICvlm3HXg	NxIknICvlm3HXg	0	A9D8DMPP/ptD2w	-73.5	40.5	NxIknICvlm3HXg	NxIknICvlm3HXg	1	on55pVMrw4OKVA
2	b00361a396177a9cb410ff61f20015ad	VTS	2009-06-14 23:23:00+00:00	2009-06-14 23:48:00+00:00	17.52	-73.787442	40.641525	-73.980072	40.742963	Credit	...	NxIknICvlm3HXg	NxIknICvlm3HXg	1	xPwvjKWL3RFfuQ	-73.6	40.6	NxIknICvlm3HXg	NxIknICvlm3HXg	0	nMaSxPM7j5ER1Q
3	b00361a396177a9cb410ff61f20015ad	VTS	2009-06-14 23:23:00+00:00	2009-06-14 23:48:00+00:00	17.52	-73.787442	40.641525	-73.980072	40.742963	Credit	...	NxIknICvlm3HXg	NxIknICvlm3HXg	0	A9D8DMPP/ptD2w	-73.6	40.6	NxIknICvlm3HXg	NxIknICvlm3HXg	0	nMaSxPM7j5ER1Q

What are we looking at?

  • Nested dicts got flattened into the parent row, coordinates became coordinates__start__lat
  • Nested lists got broken out into separate tables with generated columns that would allow us to join the data back when needed.

Part 3: Incremental loading

Incremental loading means that as we update our datasets with the new data, we would only load the new data, as opposed to making a full copy of a source’s data all over again and replacing the old version.

By loading incrementally, our pipelines run faster and cheaper.

  • Incremental loading goes hand in hand with incremental extraction and state
    • State is information that keeps track of what was loaded, to know what else remains to be loaded. dlt stores the state at the destination in a separate table.
    • Incremental extraction refers to only requesting the increment of data that we need, and not more. This is tightly connected to the state to determine the exact chunk that needs to be extracted and loaded.

The challenge of incremental pipelines is that if we do not keep track of the state of the load (i.e. which increments were loaded and which are to be loaded) .

Choosing a write disposition

The 3 write dispositions:
  • Full load: replaces the destination dataset with whatever the source produced on this run. To achieve this, use write_disposition=’replace’ in your resources.
  • Append: appends the new data to the destination. Use write_disposition=’append‘.
  • Merge: Merges new data to the destination using merge_key and/or deduplicates/upserts new data using primary_key. Use write_disposition=’merge‘.
Merge incremental loading

The merge write disposition is used in two scenarios:

  1. You want to keep only one instance of certain record i.e. you receive updates of the user state from an API and want to keep just one record per user_id.
  2. You receive data in daily batches, and you want to make sure that you always keep just a single instance of a record for each batch even in case you load an old batch or load the current batch several times a day (i.e. to receive “live” updates).

The merge write disposition loads data to a staging dataset, deduplicates the staging data if a primary_key is provided, deletes the data from the destination using merge_key and primary_key, and then inserts the new records. All of this happens in a single atomic transaction for a parent and all child tables.

Example: I the scores of the 2 passengers changed. Turns out their payment didn’t go through for the ride before and they got a bad rating from the driver, so now we have to update their rating.

As you can see after running the code, their ratings are now lowered

   pipeline = dlt.pipeline(destination='duckdb', dataset_name='taxi_rides')

# run the pipeline with default settings, and capture the outcome
info = pipeline.run(data,
										table_name="rides",
										write_disposition="merge",
                    primary_key='record_hash')
    Pasengers table Before 
name	rating	_dlt_root_id	_dlt_parent_id	_dlt_list_idx	_dlt_id
0	John	4.9	NxIknICvlm3HXg	NxIknICvlm3HXg	0	A9D8DMPP/ptD2w
1	Jack	3.9	NxIknICvlm3HXg	NxIknICvlm3HXg	1	xPwvjKWL3RFfuQ



Pasengers table after merge
name	rating	_dlt_root_id	_dlt_parent_id	_dlt_list_idx	_dlt_id
0	John	4.4	SpBPz6CiHyz + 1A	SpBPz6CiHyz + 1A	0	j06fU + ylaR47nQ
1	Jack	3.6	SpBPz6CiHyz + 1A	SpBPz6CiHyz + 1A	1	7HZIZPxaw5t4rA

Source : https://dlthub.com/docs/general-usage/incremental-loading