Published
- 6 min read
Data Engineering Zoomcamp, Module 1: Data Ingestion with Docker
Data Engineering Zoomcamp , Week 1
2024 Cohort
- Starts: 15 January 2024 (Monday) at 17:00 CET
- Cohort folder with homeworks and deadlines
- Launch stream with course overview
Overview
Prerequisites
To get the most out of this course, you should feel comfortable with coding and command line and know the basics of SQL. Prior experience with Python will be helpful, but you can pick Python relatively fast if you have experience with other programming languages.
Module 1: Containerization and Infrastructure as Code
Introduction and Prerequisites
Welcome to my blog about my first week in the Data Engineering Zoomcamp! In this first module, we will be exploring containerization and infrastructure as code. This is an exciting topic as it allows us to create reproducible and scalable environments for our data pipelines.
- Throughout the week, we will be building a data pipeline using Python scripts to ingest data from a CSV source and store it in a table in Postgres. We will be using Docker to containerize our environment and make it easy to deploy and manage. Additionally, we will be using pgcli, a command-line interface for Postgres, to interact with our database.
- To start, we will be using Jupyter Notebook to ingest our data and then move on to using pgAdmin, a web-based interface for Postgres, also running in Docker. We will also be using argparse to create a command-line interface for our Python script, making it easy to pass in arguments and customize our pipeline.
- Finally, we will be using Docker Compose to orchestrate our containers and run both Postgres and pgAdmin together. This will allow us to easily manage our environment and ensure that our pipeline is running smoothly.
Docker
- Docker is an open platform that allows you to develop, ship, and run applications. It enables you to separate your applications from your infrastructure, making it easier to deliver software quickly.
- You run this box on a host machine. The container is completely isolated from the host machine env.
- You can run multiple containers on one host and they won’t have any conflict.
- An image = set of instructions that were executed + state. All saved in “image”
- Installing docker: https://docs.docker.com/get-docker/
Why should data engineers care about containerization and docker?
- Setting up things locally for experiments
- Integration tests, CI/CD
- Batch jobs (AWS Batch, Kubernetes jobs, etc — outside of the scope)
- Spark
- Serverless (AWS Lambda)
- So containers are everywhere
Postgres
PostgreSQL is a powerful, open-source object-relational database system with over 35 years of active development. It is known for its reliability, feature robustness, and performance. PostgreSQL supports both SQL (relational) and JSON (non-relational) querying, making it suitable for a wide range of applications, including web, mobile, geospatial, and analytics applications.
- For practicing with SQL, we’ll use Postgres
- It’s a general purpose database - for transitions, not for analytics
- But it’s powerful enough and sometimes also used as a data warehouse - a special database for analytical queries (more in week 3)
- We can set it up locally to practice SQL and test things before doing this in the cloud
Running postgres with Docker
docker run - it \
-e POSTGRES_USER = "root" \
-e POSTGRES_PASSWORD = "root" \
-e POSTGRES_DB = "ny_taxi" \
-v "./ny-taxi-volume:/var/lib/postgresql/data" \
-p 5432: 5432 \
postgres: 13
CLI for Postgres
pip install pgcli
This installs a client for postgres that we’ll use
Let’s connect to it:
pgcli - h localhost - p 5432 - u root - d ny_taxi
And check what’s there
\l --List databases
\dt --List tabels
Not much. Let’s put some data
NY Trips Dataset
According to the TLC data website, from 05/13/2022, the data will be in .parquet format instead of .csv The website has provided a useful link with sample steps to read .parquet file and convert it to Pandas data frame.
You can use the csv backup located here, https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz, to follow along with the video.
pgAdmin
Running pgAdmin
docker run - it \
-e PGADMIN_DEFAULT_EMAIL = "admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD = "root" \
-p 8080: 80 \
dpage / pgadmin4
Running Postgres and pgAdmin together
Create a network
docker network create pg - network
Run Postgres
docker run - it \
-e POSTGRES_USER = "root" \
-e POSTGRES_PASSWORD = "root" \
-e POSTGRES_DB = "ny_taxi" \
-v "$(pwd)/ny_taxi_postgres_data": /var/lib / postgresql / data \
-p 5432: 5432 \
--network=pg - network \
--name pg - database \
postgres: 13
Run pgAdmin
docker run - it \
-e PGADMIN_DEFAULT_EMAIL = "admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD = "root" \
-p 8080: 80 \
--network=pg - network \
--name pgadmin \
dpage / pgadmin4
Data ingestion
Preview of our ingestion python script
import os
import argparse
from time import time
import pandas as pd
from sqlalchemy import create_engine
.
.
.
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name = table_name, con = engine, if_exists = 'append')
t_end = time()
print('inserted another chunk, took %.3f second' % (t_end - t_start))
except StopIteration:
print("Finished ingesting data into the postgres database")
break
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'Ingest CSV data to Postgres')
parser.add_argument('--user', required = True, help = 'user name for postgres')
parser.add_argument('--password', required = True, help = 'password for postgres')
parser.add_argument('--host', required = True, help = 'host for postgres')
parser.add_argument('--port', required = True, help = 'port for postgres')
parser.add_argument('--db', required = True, help = 'database name for postgres')
parser.add_argument('--table_name', required = True, help = 'name of the table where we will write the results to')
parser.add_argument('--url', required = True, help = 'url of the csv file')
args = parser.parse_args()
main(args)
Running locally
URL="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"
python ingest_data.py \
--user=root \
--password=root \
--host=localhost \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=${URL}
Build the image
docker build - t taxi_ingest: v001.
Run the script with Docker
URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"docker run - it \
--network=pg - network \
taxi_ingest: v001 \
--user=root \
--password=root \
--host=pg - database \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=${ URL }
Docker Compose
Docker Compose is a tool for defining and running multi-container applications. It simplifies the control of an entire application stack, making it easy to manage services, networks, and volumes in a single, comprehensible YAML configuration file. With Docker Compose, you can create and start all the services from your configuration file with a single command.
Run it:
docker - compose up
Run in detached mode:
docker - compose up - d
Shutting it down:
docker-compose down
Note: to make pgAdmin configuration persistent, create a folder data_pgadmin. Change its permission via
sudo chown 5050: 5050 data_pgadmin
and mount it to the /var/lib/pgadmin folder:
services:
pgadmin:
image: dpage / pgadmin4 volumes:
- ./ data_pgadmin: /var/lib / pgadmin
SQL
- Adding the Zones table
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
df_zones = pd.read_csv('taxi+_zone_lookup.csv')
df_zones.head()
df_zones.to_sql(name = 'zones', con = engine, if_exists = 'replace')
query = """
SELECT * FROM zones LIMIT 10
"""
pd.read_sql(query, con = engine)
df_green = pd.read_csv('./green_tripdata_2019-09.csv.gz')
df_green.head()
print(pd.io.sql.get_schema(df_green, name = 'green_taxi_data'))
df_green.lpep_pickup_datetime = pd.to_datetime(df_green.lpep_pickup_datetime)
df_green.lpep_dropoff_datetime = pd.to_datetime(df_green.lpep_dropoff_datetime)
df_green
df_green.to_sql(name = 'green_taxi_data', con = engine, if_exists = 'replace')
query = """
SELECT * FROM green_taxi_data LIMIT 10
"""
pd.read_sql(query, con = engine)
- Inner joins
- Basic data quality checks
- Left, Right and Outer joins
- Group by
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND
schemaname != 'information_schema';