Home

Published

- 6 min read

Data Engineering Zoomcamp, Module 1: Data Ingestion with Docker

img of Data Engineering Zoomcamp, Module 1: Data Ingestion with Docker

Data Engineering Zoomcamp , Week 1

2024 Cohort

Overview

Prerequisites

To get the most out of this course, you should feel comfortable with coding and command line and know the basics of SQL. Prior experience with Python will be helpful, but you can pick Python relatively fast if you have experience with other programming languages.

Module 1: Containerization and Infrastructure as Code

Introduction and Prerequisites

Welcome to my blog about my first week in the Data Engineering Zoomcamp! In this first module, we will be exploring containerization and infrastructure as code. This is an exciting topic as it allows us to create reproducible and scalable environments for our data pipelines.

  • Throughout the week, we will be building a data pipeline using Python scripts to ingest data from a CSV source and store it in a table in Postgres. We will be using Docker to containerize our environment and make it easy to deploy and manage. Additionally, we will be using pgcli, a command-line interface for Postgres, to interact with our database.
  • To start, we will be using Jupyter Notebook to ingest our data and then move on to using pgAdmin, a web-based interface for Postgres, also running in Docker. We will also be using argparse to create a command-line interface for our Python script, making it easy to pass in arguments and customize our pipeline.
  • Finally, we will be using Docker Compose to orchestrate our containers and run both Postgres and pgAdmin together. This will allow us to easily manage our environment and ensure that our pipeline is running smoothly.

Docker

  • Docker is an open platform that allows you to develop, ship, and run applications. It enables you to separate your applications from your infrastructure, making it easier to deliver software quickly.
  • You run this box on a host machine. The container is completely isolated from the host machine env.
  • You can run multiple containers on one host and they won’t have any conflict.
  • An image = set of instructions that were executed + state. All saved in “image”
  • Installing docker: https://docs.docker.com/get-docker/

Why should data engineers care about containerization and docker?

  • Setting up things locally for experiments
  • Integration tests, CI/CD
  • Batch jobs (AWS Batch, Kubernetes jobs, etc — outside of the scope)
  • Spark
  • Serverless (AWS Lambda)
  • So containers are everywhere

Postgres

PostgreSQL is a powerful, open-source object-relational database system with over 35 years of active development. It is known for its reliability, feature robustness, and performance. PostgreSQL supports both SQL (relational) and JSON (non-relational) querying, making it suitable for a wide range of applications, including web, mobile, geospatial, and analytics applications.

  • For practicing with SQL, we’ll use Postgres
  • It’s a general purpose database - for transitions, not for analytics
  • But it’s powerful enough and sometimes also used as a data warehouse - a special database for analytical queries (more in week 3)
  • We can set it up locally to practice SQL and test things before doing this in the cloud

Running postgres with Docker

   docker run - it \
-e POSTGRES_USER = "root" \
-e POSTGRES_PASSWORD = "root" \
-e POSTGRES_DB = "ny_taxi" \
-v "./ny-taxi-volume:/var/lib/postgresql/data" \
-p 5432: 5432 \
postgres: 13

CLI for Postgres

   pip install pgcli

This installs a client for postgres that we’ll use

Let’s connect to it:

   pgcli - h localhost - p 5432 - u root - d ny_taxi

And check what’s there

   \l --List databases
\dt --List tabels

Not much. Let’s put some data

NY Trips Dataset

According to the TLC data website, from 05/13/2022, the data will be in .parquet format instead of .csv The website has provided a useful link with sample steps to read .parquet file and convert it to Pandas data frame.

You can use the csv backup located here, https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz, to follow along with the video.

pgAdmin

Running pgAdmin

   docker run - it \
-e PGADMIN_DEFAULT_EMAIL = "admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD = "root" \
-p 8080: 80 \
dpage / pgadmin4

Running Postgres and pgAdmin together

Create a network

   docker network create pg - network

Run Postgres

   docker run - it \
-e POSTGRES_USER = "root" \
-e POSTGRES_PASSWORD = "root" \
-e POSTGRES_DB = "ny_taxi" \
-v "$(pwd)/ny_taxi_postgres_data": /var/lib / postgresql / data \
-p 5432: 5432 \
--network=pg - network \
--name pg - database \
postgres: 13

Run pgAdmin

   docker run - it \
-e PGADMIN_DEFAULT_EMAIL = "admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD = "root" \
-p 8080: 80 \
--network=pg - network \
--name pgadmin \
dpage / pgadmin4

Data ingestion

Preview of our ingestion python script

   import os
import argparse
from time import time
import pandas as pd
from sqlalchemy import create_engine
.
.
.

df = next(df_iter)

df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

df.to_sql(name = table_name, con = engine, if_exists = 'append')

t_end = time()

print('inserted another chunk, took %.3f second' % (t_end - t_start))

        except StopIteration:
print("Finished ingesting data into the postgres database")
break

if __name__ == '__main__':
  parser = argparse.ArgumentParser(description = 'Ingest CSV data to Postgres')

parser.add_argument('--user', required = True, help = 'user name for postgres')
parser.add_argument('--password', required = True, help = 'password for postgres')
parser.add_argument('--host', required = True, help = 'host for postgres')
parser.add_argument('--port', required = True, help = 'port for postgres')
parser.add_argument('--db', required = True, help = 'database name for postgres')
parser.add_argument('--table_name', required = True, help = 'name of the table where we will write the results to')
parser.add_argument('--url', required = True, help = 'url of the csv file')

args = parser.parse_args()

main(args)

Running locally

   URL="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"

python ingest_data.py \
  --user=root \
  --password=root \
  --host=localhost \
  --port=5432 \
  --db=ny_taxi \
  --table_name=yellow_taxi_trips \
  --url=${URL}

Build the image

   docker build - t taxi_ingest: v001.

Run the script with Docker

   URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"docker run - it \
--network=pg - network \
taxi_ingest: v001 \
--user=root \
--password=root \
--host=pg - database \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=${ URL }

Docker Compose

Docker Compose is a tool for defining and running multi-container applications. It simplifies the control of an entire application stack, making it easy to manage services, networks, and volumes in a single, comprehensible YAML configuration file. With Docker Compose, you can create and start all the services from your configuration file with a single command.

Run it:

   docker - compose up

Run in detached mode:

   docker - compose up - d

Shutting it down:

   docker-compose down

Note: to make pgAdmin configuration persistent, create a folder data_pgadmin. Change its permission via

   sudo chown 5050: 5050 data_pgadmin

and mount it to the /var/lib/pgadmin folder:

   services:
pgadmin:
image: dpage / pgadmin4    volumes:
- ./ data_pgadmin: /var/lib / pgadmin

SQL

  • Adding the Zones table
   !wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
df_zones = pd.read_csv('taxi+_zone_lookup.csv')
df_zones.head()
df_zones.to_sql(name = 'zones', con = engine, if_exists = 'replace')
query = """
SELECT * FROM zones LIMIT 10
"""

pd.read_sql(query, con = engine)
df_green = pd.read_csv('./green_tripdata_2019-09.csv.gz')
df_green.head()
print(pd.io.sql.get_schema(df_green, name = 'green_taxi_data'))
df_green.lpep_pickup_datetime = pd.to_datetime(df_green.lpep_pickup_datetime)
df_green.lpep_dropoff_datetime = pd.to_datetime(df_green.lpep_dropoff_datetime)
df_green
df_green.to_sql(name = 'green_taxi_data', con = engine, if_exists = 'replace')
query = """
SELECT * FROM green_taxi_data LIMIT 10
"""

pd.read_sql(query, con = engine)

  • Inner joins
  • Basic data quality checks
  • Left, Right and Outer joins
  • Group by
   SELECT *
  FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND
schemaname != 'information_schema';

Source: https://www.postgresqltutorial.com