Home

Published

- 6 min read

Data ingestion with dlt Workshop Homework

img of Data ingestion with dlt Workshop Homework

Homework: Data talks club data engineering zoomcamp Data loading workshop

1. Use a generator

Question 1: What is the sum of the outputs of the generator for limit = 5?

  • A: 10.23433234744176
  • B: 7.892332347441762
  • C: 8.382332347441762
  • D: 9.123332347441762
   %%capture
!pip install dlt[duckdb]

def square_root_generator(limit):
    n = 1
    while n <= limit:
        yield n ** 0.5
        n += 1
   # Example limit=5:
limit = 5
generator = square_root_generator(limit)
sum = 0
for sqrt_value in generator:
  sum += sqrt_value
print(f'yielded number : {sqrt_value}')
print(f'Sum is {sum}')

yielded number : 1.0
yielded number : 1.4142135623730951
yielded number : 1.7320508075688772
yielded number : 2.0
yielded number : 2.23606797749979
Sum is 8.382332347441762

Question 2: What is the 13th number yielded by the generator?

  • A: 4.236551275463989
  • B: 3.605551275463989
  • C: 2.345551275463989
  • D: 5.678551275463989
   limit = 13
generator = square_root_generator(limit)
count = 1
for sqrt_value in generator:
  print(f'yielded number {count} is : {sqrt_value}')
count += 1
yielded number 1 is: 1.0
......
yielded number 13 is: 3.605551275463989

2. Append a generator to a table with existing data

Below you have 2 generators. You will be tasked to load them to duckdb and answer some questions from the data

   def people_1():
for i in range(1, 6):
        yield { "ID": i, "Name": f"Person_{i}", "Age": 25 + i, "City": "City_A" }

for person in people_1():
  print(person)


def people_2():
for i in range(3, 9):
        yield { "ID": i, "Name": f"Person_{i}", "Age": 30 + i, "City": "City_B", "Occupation": f"Job_{i}" }


for person in people_2():
  print(person)

{'ID': 1, 'Name': 'Person_1', 'Age': 26, 'City': 'City_A'}
{'ID': 2, 'Name': 'Person_2', 'Age': 27, 'City': 'City_A'}
{'ID': 3, 'Name': 'Person_3', 'Age': 28, 'City': 'City_A'}
{'ID': 4, 'Name': 'Person_4', 'Age': 29, 'City': 'City_A'}
{'ID': 5, 'Name': 'Person_5', 'Age': 30, 'City': 'City_A'}
{'ID': 3, 'Name': 'Person_3', 'Age': 33, 'City': 'City_B', 'Occupation': 'Job_3'}
{'ID': 4, 'Name': 'Person_4', 'Age': 34, 'City': 'City_B', 'Occupation': 'Job_4'}
{'ID': 5, 'Name': 'Person_5', 'Age': 35, 'City': 'City_B', 'Occupation': 'Job_5'}
{'ID': 6, 'Name': 'Person_6', 'Age': 36, 'City': 'City_B', 'Occupation': 'Job_6'}
{'ID': 7, 'Name': 'Person_7', 'Age': 37, 'City': 'City_B', 'Occupation': 'Job_7'}
{'ID': 8, 'Name': 'Person_8', 'Age': 38, 'City': 'City_B', 'Occupation': 'Job_8'}

1. Load the first generator and calculate the sum of ages of all people. Make sure to only load it once.

   import dlt

people = []
def people_1():
    for i in range(1, 6):
        yield {"ID": i, "Name": f"Person_{i}", "Age": 25 + i, "City": "City_A"}


for person in people_1():
    people.append(person)
    
    
first_generator = dlt.pipeline(destination='duckdb', dataset_name='generators')

info = first_generator.run(people_1(),
                           table_name = "table_1",
                           write_disposition ="replace")

print(info)

...........................

import duckdb

database_file = f"{first_generator.pipeline_name}.duckdb"


conn = duckdb.connect(database_file)

# List the tables

conn.sql(f"SET search_path ={first_generator.dataset_name}")
print('Loaded Tables:')
display(conn.sql("show tables"))


print("\n\n\n table_1 table below:")
q1 = conn.sql("SELECT * FROM table_1").df()
display(q1)


print("\n\n\n The sum of ages is :")
q2 = conn.sql("SELECT sum(age) as sum_of_age FROM table_1").df()
display(q2)
   Loaded Tables:
┌─────────────────────┐
name
varchar
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ table_1             │
└─────────────────────┘


 table_1 table below:
id	name	age	city	_dlt_load_id	_dlt_id	occupation
0	1	Person_1	26	City_A	1707401339.3088036	UroVJbepmFKqnA	None
1	2	Person_2	27	City_A	1707401339.3088036	Y + 0JyQyF59jEtw	None
2	3	Person_3	28	City_A	1707401339.3088036	SgqyU2 / uww7EpA	None
3	4	Person_4	29	City_A	1707401339.3088036	JvAWjI3OX + OX2A	None
4	5	Person_5	30	City_A	1707401339.3088036	bePdJIzeRdmJZA	None


 The sum of ages is:
sum_of_age
0	140.0

2. Append the second generator to the same table as the first.

   #Appending the second generator to the table
def people_2():
for i in range(3, 9):
        yield { "ID": i, "Name": f"Person_{i}", "Age": 30 + i, "City": "City_B", "Occupation": f"Job_{i}" }


for person in people_2():
  people.append(person)

second_generator = dlt.pipeline(destination = 'duckdb', dataset_name = 'generators')

info = first_generator.run(people_2(),
  table_name = "table_1",
  write_disposition = "append")

print(info)

Question 3. Append the 2 generators. After correctly appending the data, calculate the sum of all ages of people.

  • A: 353
  • B: 365
  • C: 378
  • D: 390
   import duckdb

database_file = f"{second_generator.pipeline_name}.duckdb"


conn = duckdb.connect(database_file)

# List the tables

conn.sql(f"SET search_path ={second_generator.dataset_name}")
print("\n\n\n table_1 table below:")
q1 = conn.sql("SELECT * FROM table_1").df()
display(q1)


print("\n\n\n The sum of all ages is :")
q2 = conn.sql("SELECT sum(age) as sum_of_age FROM table_1").df()
display(q2)
   table_1 table below:
id	name	age	city	_dlt_load_id	_dlt_id	occupation
0	1	Person_1	26	City_A	1707400876.891016	Mm9q5s7ftV6CrQ	None
1	2	Person_2	27	City_A	1707400876.891016	b5SdRFte3A4ViA	None
2	3	Person_3	28	City_A	1707400876.891016	l8 + 6BvXG1yPPVA	None
3	4	Person_4	29	City_A	1707400876.891016	kjnf3OKVXLg8vg	None
4	5	Person_5	30	City_A	1707400876.891016	POlBfP7tbJbFWg	None
5	3	Person_3	33	City_B	1707400888.477188	jUOStLvVMJ7CgA	Job_3
6	4	Person_4	34	City_B	1707400888.477188	p02RLSQWHGRr4Q	Job_4
7	5	Person_5	35	City_B	1707400888.477188	3zc2BoiIDQ1luA	Job_5
8	6	Person_6	36	City_B	1707400888.477188	9Tc / yaE0kBKmqA	Job_6
9	7	Person_7	37	City_B	1707400888.477188	QlTEQtVCT8XaoQ	Job_7
10	8	Person_8	38	City_B	1707400888.477188	kS4pWo / s2bywFA	Job_8


 The sum of all ages is:
sum_of_age
0	353.0

3.After correctly appending the data, calculate the sum of all ages of people.

   second_generator = dlt.pipeline(destination = 'duckdb', dataset_name = 'generators')

info = second_generator.run(people_2(),
  table_name = "table_1",
  primary_key = "id",
  write_disposition = "merge")

print(info)

3. Merge a generator

Re-use the generators from Exercise 2.

A table’s primary key needs to be created from the start, so load your data to a new table with primary key ID.

Load your first generator first, and then load the second one with merge. Since they have overlapping IDs, some of the records from the first load should be replaced by the ones from the second load.

After loading, you should have a total of 8 records, and ID 3 should have age 33.

Question 4. Merge the 2 generators using the ID column. Calculate the sum of ages of all the people loaded as described above.

  • A: 215
  • B: 266
  • C: 241
  • D: 258
   import duckdb
database_file = f"{second_generator.pipeline_name}.duckdb"
conn = duckdb.connect(database_file)
conn.sql(f"SET search_path ={second_generator.dataset_name}")
# List the merged tabel's data
q3 = conn.sql("SELECT * FROM table_1").df()
display(q3)


print("\n\n\n The sum of all ages in merged table is :")
q4 = conn.sql("SELECT sum(age) as sum_of_age FROM table_1").df()
display(q4)
   
id	name	age	city	_dlt_load_id	_dlt_id	occupation
0	1	Person_1	26	City_A	1707400876.891016	Mm9q5s7ftV6CrQ	None
1	2	Person_2	27	City_A	1707400876.891016	b5SdRFte3A4ViA	None
2	5	Person_5	35	City_B	1707401322.7057714	4qFdiJ9Tkb8TxQ	Job_5
3	4	Person_4	34	City_B	1707401322.7057714	DpeMCKsYZv+G1w	Job_4
4	7	Person_7	37	City_B	1707401322.7057714	b4Lu8XOKI4KR5A	Job_7
5	3	Person_3	33	City_B	1707401322.7057714	+RgMiLCDeACrdw	Job_3
6	6	Person_6	36	City_B	1707401322.7057714	yz72fWEjCkV1ng	Job_6
7	8	Person_8	38	City_B	1707401322.7057714	3+XhEml85fbdpw	Job_8


 The sum of all ages in merged table is :
sum_of_age
0	266.0