Data Work (ETL + EDA)

AI Professionals Bootcamp | Week 2

2025-12-22

Day 2: Verify + Clean + Join (first real EDA)

Goal: add data checks + safe joins so your EDA won’t lie.

Bootcamp • SDAIA Academy

Today’s Flow

  • Session 1 (60m): Verify — turn assumptions into checks
  • Asr Prayer (20m)
  • Session 2 (60m): Clean — missingness + categories + duplicates
  • Maghrib Prayer (20m)
  • Session 3 (60m): Join + report — safe merges + first EDA summary
  • Isha Prayer (20m)
  • Hands-on (120m): Build checks.py + run_day2_validate_and_report.py

Learning Objectives

By the end of today, you can:

  • write lightweight data quality checks (columns, non-empty, uniqueness, ranges)
  • quantify missingness (counts + %) and choose drop vs flag vs careful impute (EDA-safe default)
  • normalize messy text categories into *_clean columns and dedupe using a business key rule
  • join tables safely with pd.merge(..., validate=...), detect join explosions with row-count checks, and measure match rate
  • produce reproducible artifacts: processed Parquet + quality artifacts (e.g. missingness CSV) + a Markdown summary in reports/

Warm-up (5 minutes)

Run Day 1 and confirm Parquet outputs still look typed.

macOS/Linux

uv run python -m scripts.run_day1_load
uv run python -c "import pandas as pd; df=pd.read_parquet('data/processed/orders.parquet'); print(df.dtypes)"

Windows PowerShell

uv run python -m scripts.run_day1_load
uv run python -c "import pandas as pd; df=pd.read_parquet('data/processed/orders.parquet'); print(df.dtypes)"

Checkpoint: data/processed/orders.parquet exists and user_id is string.

Quick review: where we left off (Day 1)

  • Import-safe project layout (packages + module execution)

  • Typed processed outputs in data/processed/ (Parquet preserves dtypes)

  • Canonical workflow:

    • Load → Verify → Clean → Transform → Analyze → Visualize → Conclude

Note

Day 1 mostly covered Load + “typed I/O”. Today we seriously start Verify + Clean.

Today’s deliverable (what you’ll commit)

A Day 2 pipeline that runs end-to-end from repo root:

  • bootcamp_data/checks.py (small, testable checks)

  • extra helpers in bootcamp_data/transforms.py (flags, normalization, dedupe)

  • scripts/run_day2_validate_and_report.py (wire it together)

  • artifacts:

    • data/processed/orders_clean.parquet
    • data/processed/orders_enriched.parquet
    • reports/day2_missingness_orders.csv
    • reports/day2_summary.md (row counts + duplicates + join match rate + a missingness snapshot)

uv quick review (so commands are consistent)

  • Run inside your project environment:

    • uv run <command>
  • Install a dependency (if we decide we need one):

    • uv add <package>

Tip

We’ll stick to uv run ... so we don’t depend on “did you activate your venv?”

If imports fail: fix the root cause (no hacks)

If you see:

ModuleNotFoundError: No module named 'bootcamp_data'

Do this checklist (in order):

  1. Are you in repo root? (folder that contains bootcamp_data/ and scripts/)

  2. Are you running as a module?

    • uv run python -m scripts.run_day1_load
    • uv run python -m scripts.run_day2_validate_and_report
  3. Does bootcamp_data/__init__.py exist?

  4. Are you using the right Python?

macOS/Linux

uv run python -c "import bootcamp_data; print('import ok')"

Windows PowerShell

uv run python -c "import bootcamp_data; print('import ok')"

Checkpoint

Raise your hand when:

  • you can run Day 1 using uv run python -m ...
  • you can explain (in 1 sentence) why we avoid “import hacks”

Session 1

Verify — turn assumptions into checks

Session 1 objectives

By the end of this session, you can:

  • explain “fail fast” and why it saves time

  • write checks for:

    • required columns
    • non-empty datasets
    • duplicate keys
    • basic numeric ranges

Why data bugs are usually silent

Bad data rarely crashes your code.

Instead it produces:

  • wrong totals
  • wrong joins
  • wrong charts
  • confident wrong conclusions

“Fail fast” in one sentence

Turn assumptions into checks that stop the pipeline with a clear message.

Tip

A pipeline that crashes early is annoying. A pipeline that silently lies is dangerous.

Where checks live (separation of concerns)

Keep your pipeline debuggable:

  • bootcamp_data/io.py → read/write
  • bootcamp_data/transforms.py → transforms (df -> df)
  • bootcamp_data/checks.py → validations (fail fast)
  • scripts/...py → thin entrypoints (wires everything together)

Check 1 — required columns

If a required column is missing, nothing else matters.

def require_columns(df, cols, *, df_name="df"):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"{df_name}: missing columns {missing}")

Micro-exercise: break it on purpose (6 minutes)

  1. Load data/processed/orders.parquet
  2. Run require_columns with one fake column ("NOT_A_COL")
  3. Confirm the error message names the missing column

Checkpoint: you get a clear ValueError that names NOT_A_COL.

Solution (example)

import pandas as pd

orders = pd.read_parquet("data/processed/orders.parquet")

def require_columns(df, cols, *, df_name="df"):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"{df_name}: missing columns {missing}")

require_columns(orders, ["order_id", "user_id", "NOT_A_COL"], df_name="orders")

Check 2 — non-empty inputs

A surprising number of pipelines produce 0 rows.

Common reasons:

  • wrong file path
  • wrong filter
  • upstream extract returned nothing
def assert_non_empty(df, *, df_name="df"):
    if len(df) == 0:
        raise ValueError(f"{df_name}: 0 rows")

Micro-exercise: predict the failure (4 minutes)

You wrote:

paid = orders[orders["status"] == "paid"]

Give two reasons paid might be empty even if there are paid orders.

Checkpoint: you can name 2 realistic reasons.

Solution: common reasons

  • inconsistent casing ("Paid", "PAID", "paid")
  • hidden whitespace ("paid ")

Today we’ll normalize categories and make joins + groupbys safer.

Check 3 — key integrity (duplicates)

Keys connect tables. If keys are wrong, joins lie.

Fast question:

  • “How many duplicate key rows do we have?”
dup_rows = df["order_id"].duplicated(keep=False)
n_dup_rows = int(dup_rows.sum())

Micro-exercise: measure duplicate order IDs (6 minutes)

  1. Load data/processed/orders.parquet
  2. Compute how many rows have a duplicate order_id

Checkpoint: you can answer: “duplicates = 0” or “duplicates > 0”.

Solution (example)

import pandas as pd

orders = pd.read_parquet("data/processed/orders.parquet")
dup_rows = orders["order_id"].duplicated(keep=False)
print("duplicate rows:", int(dup_rows.sum()))

Check 4 — range checks (practical)

Even after parsing, numbers can be invalid.

Examples:

  • amount >= 0
  • quantity >= 0
def assert_in_range(s, lo=None, hi=None, name="value"):
    x = s.dropna()
    if lo is not None and not (x >= lo).all():
        raise ValueError(f"{name}: below {lo}")
    if hi is not None and not (x <= hi).all():
        raise ValueError(f"{name}: above {hi}")

Micro-exercise: check for negative amounts (5 minutes)

  1. Load orders.parquet
  2. Count how many rows have amount < 0 (ignore missing)

Checkpoint: you can report n_negative.

Solution (example)

import pandas as pd

orders = pd.read_parquet("data/processed/orders.parquet")
n_negative = int((orders["amount"].dropna() < 0).sum())
print("n_negative:", n_negative)

Session 1 recap

  • Checks turn assumptions into clear, early failures

  • Start with high ROI:

    • required columns
    • non-empty
    • duplicates / key sanity
    • numeric ranges
  • Keep checks separate from transforms and I/O

Asr break

20 minutes

When you return: we’ll quantify missingness and clean without deleting the story.

Session 2

Clean — missingness + categories + duplicates

Session 2 objectives

By the end of this session, you can:

  • measure missingness (counts + %)
  • explain why blanket .dropna() is risky
  • default to missingness flags for EDA
  • normalize messy categories into *_clean
  • apply an explicit dedupe rule (and record what you did)

Missingness is a signal (not just a mess)

Missing values can mean:

  • unknown
  • not applicable
  • upstream bug
  • intentionally blank

Your move: measure first, then decide.

Missingness report (counts + %)

def missingness_report(df):
    n = len(df)
    rep = df.isna().sum().rename("n_missing").to_frame()
    rep["p_missing"] = rep["n_missing"] / (n if n else 1)
    return rep.sort_values("p_missing", ascending=False)

Micro-exercise: run the report (6 minutes)

  1. Load data/processed/orders.parquet
  2. Run missingness_report(orders)
  3. Print the top 5 rows

Checkpoint: you can name the most-missing column.

Solution (example)

import pandas as pd

orders = pd.read_parquet("data/processed/orders.parquet")
rep = missingness_report(orders)
print(rep.head(5))

Decide: drop vs impute vs flag

A simple rule for analytics/EDA:

  • Drop: only when rows are unusable
  • Impute: only with strong justification
  • Flag: often the best default (*_isna)

Tip

Flags let you analyze “missingness patterns” later.

Pattern: add missingness flags

def add_missing_flags(df, cols):
    out = df.copy()
    for c in cols:
        out[f"{c}__isna"] = out[c].isna()
    return out

Micro-exercise: add flags (5 minutes)

Add flags for:

  • amount
  • quantity

Checkpoint: your DataFrame has amount__isna and quantity__isna.

Solution (example)

orders2 = orders.copy()
orders2["amount__isna"] = orders2["amount"].isna()
orders2["quantity__isna"] = orders2["quantity"].isna()

print(orders2[["amount", "amount__isna", "quantity", "quantity__isna"]].head())

Text categories: normalize + map (so EDA is consistent)

Category columns (like status) can create fake groups:

  • "Paid", "PAID", " paid " become 3 categories
  • typos/synonyms ("refunded" vs "refund") split the story

Best practice:

  • keep the original column
  • create a new *_clean column you use for analysis
import re

_ws = re.compile(r"\s+")
status_norm = (
    orders["status"].astype("string")
    .str.strip()
    .str.casefold()
    .str.replace(_ws, " ", regex=True)
)
mapping = {"refunded": "refund"}
status_clean = status_norm.map(lambda x: mapping.get(x, x))

Note

re.compile(r"\s+") matches one-or-more whitespace, so multiple spaces/tabs collapse to one.

Micro-exercise: normalize + map status (6 minutes)

  1. Print orders["status"].value_counts(dropna=False).head(10)
  2. Create status_norm (trim + casefold + whitespace collapse)
  3. Add a small mapping (at least one synonym)
  4. Print counts for the clean version

Checkpoint: your “clean” counts look more consistent than the raw counts.

Solution (example)

import re

_ws = re.compile(r"\s+")

print(orders["status"].value_counts(dropna=False).head(10))

status_norm = (
    orders["status"].astype("string").str.strip().str.casefold().str.replace(_ws, " ", regex=True)
)
mapping = {"refunded": "refund"}
status_clean = status_norm.map(lambda x: mapping.get(x, x))

print(status_clean.value_counts(dropna=False).head(10))

Duplicates: decide on a rule (don’t guess)

Not all duplicates are exact row duplicates.

First choose a business key, then choose a keep rule.

Common keep rules:

  • keep the last record you saw (simple, explicit)
  • keep the latest by a reliable timestamp (better, but requires parsing)

Pattern: dedupe by key (keep last)

import pandas as pd

def dedupe_keep_last(df: pd.DataFrame, key_cols: list[str]) -> pd.DataFrame:
    return df.drop_duplicates(subset=key_cols, keep="last").reset_index(drop=True)

Micro-exercise: dedupe and check uniqueness (6 minutes)

  1. If order_id duplicates exist, create a deduped DataFrame
  2. Verify it has no duplicate order_id

Checkpoint: deduped["order_id"].duplicated().sum() == 0.

Solution (example)

import pandas as pd

orders = pd.read_parquet("data/processed/orders.parquet")
deduped = orders.drop_duplicates(subset=["order_id"], keep="last")
print("dup after:", int(deduped["order_id"].duplicated().sum()))

Session 2 recap

  • Missingness: measure → decide (drop/impute/flag)
  • Flags are a safe EDA default
  • Normalize categories into *_clean before groupby
  • Dedupe requires a business key + an explicit keep rule

Maghrib break

20 minutes

When you return: we’ll join orders to users safely and write our first Day 2 report.

Session 3

Join + report — safe merges + first EDA summary

Session 3 objectives

By the end of this session, you can:

  • explain common join failures (dtype mismatch, missing keys, duplicate keys → join explosions)
  • join with pd.merge(..., validate=...) and add a simple row-count sanity check
  • measure join coverage with indicator=True (match rate)
  • write a minimal Markdown report you can trust (so EDA is reproducible)

Joins can silently lie

A join usually doesn’t crash. It just changes your row count.

Watch for:

  • unexpected row increases
  • lots of missing values after the join
  • groupbys that look “too good to be true”

Join pitfall #1 — dtype mismatch

If orders.user_id is a string but users.user_id is an int, you can get:

  • a join that matches almost nothing
  • lots of missing country (or other user fields)

Tip

Enforce key dtypes early, then merge.

pd.merge(..., validate=...) (your safety belt)

validate= tells pandas what join cardinality you expect:

  • "one_to_one"
  • "one_to_many"
  • "many_to_one"
  • "many_to_many" (no protection)

For orders → users:

  • many orders per user
  • one user row per user

So we expect: validate="many_to_one".

Micro-exercise: choose the right validate (4 minutes)

You are joining:

  • orders (many rows per user_id)
  • users (one row per user_id)

What should you use?

  1. one_to_one
  2. many_to_one
  3. many_to_many

Checkpoint: choose A/B/C and explain in 1 sentence.

Solution

B) many_to_one — many orders map to one user.

Measure join coverage with indicator=True

Use indicator=True to see whether each row matched:

out = orders.merge(users, on="user_id", how="left", validate="many_to_one", indicator=True)
print(out["_merge"].value_counts())

Micro-exercise: compute the match rate (7 minutes)

  1. After the merge, compute:

    • n_total
    • n_matched (_merge == "both")
    • match_rate = n_matched / n_total

Checkpoint: you can print match_rate as a %.

Solution (example)

n_total = len(out)
n_matched = int((out["_merge"] == "both").sum())
print(f"match_rate: {n_matched / n_total:.1%}")

Join pitfall #2 — duplicate keys on the “one” side

If users.user_id is not unique, your merge can multiply rows.

Two ways to catch it:

  • pre-check uniqueness (assert_unique_key(users, "user_id"))
  • merge with validate="many_to_one" (it will raise)

Quick Check

Question: If pd.merge(..., validate="many_to_one") raises, what’s the most likely cause?

Answer: the right table (users) has duplicate user_id values.

Join explosion demo — how totals get inflated

A join can change your row count without errors.

Bad outcome: your aggregates (revenue, counts) become wrong.

import pandas as pd

orders = pd.DataFrame(
    {"order_id": ["o1", "o2"], "user_id": ["u1", "u1"], "amount": [10, 20]}
)

# BUG: users should be unique by user_id, but it's not
users = pd.DataFrame(
    {"user_id": ["u1", "u1"], "country": ["SA", "SA"]}
)

out = orders.merge(users, on="user_id", how="left")  # no validate!
print("rows:", len(orders), "→", len(out))
print("revenue before:", orders["amount"].sum())
print("revenue after :", out["amount"].sum())  # inflated

Fix: enforce uniqueness and use validate=... + row-count checks.

Join safety checklist (operational)

Before the merge:

During the merge:

  • use validate=..., suffixes=..., indicator=True

After the merge:

Reporting: why write a Markdown report?

A report is an artifact you can:

  • review offline
  • diff in Git
  • share with a teammate
  • rerun tomorrow and compare

Tip

If it matters, write it to reports/.

Pattern: build small Markdown tables (Week 1 skill)

Use Week 1 tools (f-strings + str.join) to render tables without extra libraries.

def md_table(headers, rows):
    lines = []
    lines.append("| " + " | ".join(headers) + " |")
    lines.append("| " + " | ".join(["---"] * len(headers)) + " |")
    for row in rows:
        lines.append("| " + " | ".join(str(x) for x in row) + " |")
    return "\n".join(lines)

Micro-exercise: one table for the report (8 minutes)

Create a “Top 5 countries by revenue” table.

  1. Group by country
  2. Sum amount
  3. Take top 5
  4. Render with md_table(...)

Checkpoint: you have a Markdown table string you can print.

Solution (example)

by_country = (
    orders_enriched.groupby("country", dropna=False)["amount"]
    .sum(min_count=1)
    .sort_values(ascending=False)
    .head(5)
)
rows = [(c, f"{v:.2f}" if v == v else "NA") for c, v in by_country.items()]
print(md_table(["country", "revenue"], rows))

Session 3 recap

  • Use validate= so merges fail when cardinality assumptions break
  • Use indicator=True to measure match coverage
  • Write a small report you can trust (and re-run)

Isha break

20 minutes

When you return: we’ll implement Day 2 as committed code + artifacts.

Hands-on

Build: validations + cleaning + join + a Day 2 report

Hands-on success criteria (today)

By the end, you should have:

  • bootcamp_data/checks.py with reusable validations

  • updates to bootcamp_data/transforms.py (flags, normalization, dedupe)

  • a runnable entrypoint: uv run python -m scripts.run_day2_validate_and_report

  • artifacts:

    • data/processed/orders_clean.parquet
    • data/processed/orders_enriched.parquet
    • reports/day2_missingness_orders.csv
    • reports/day2_summary.md
  • at least one commit pushed to GitHub

Project layout (what you add today)

bootcamp_data/
  __init__.py
  config.py
  io.py
  transforms.py            # add helpers
  checks.py                # NEW
scripts/
  __init__.py
  run_day1_load.py
  run_day2_validate_and_report.py   # NEW
reports/
  day2_summary.md
data/
  raw/
  processed/

Vibe coding (safe version)

  1. Write the plan in 5 bullets (no code yet)
  2. Implement the smallest piece
  3. Run → break → read error → fix
  4. Commit
  5. Repeat

Warning

Do not ask GenAI to write your solution code. Ask it to explain concepts or errors.

Task 1 — Create bootcamp_data/checks.py (20 minutes)

Create a new module with these functions:

  • require_columns(df, cols, *, df_name="df")
  • assert_non_empty(df, *, df_name="df")
  • missingness_report(df) -> DataFrame (counts + %)
  • assert_unique_key(df, key, *, allow_na=False, df_name="df")
  • assert_in_range(s, lo=None, hi=None, name="value")

Checkpoint: you can import everything.

macOS/Linux

uv run python -c "from bootcamp_data.checks import require_columns, assert_non_empty, missingness_report, assert_unique_key, assert_in_range; print('checks import: ok')"

Windows PowerShell

uv run python -c "from bootcamp_data.checks import require_columns, assert_non_empty, missingness_report, assert_unique_key, assert_in_range; print('checks import: ok')"

Hint — keep checks readable

Tip

Prefer small functions + clear error messages.

If you can’t explain the check in one sentence, it’s too complicated (for now).

Solution — bootcamp_data/checks.py (part 1)

import pandas as pd

def require_columns(df: pd.DataFrame, cols: list[str], *, df_name: str = "df") -> None:
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"{df_name}: missing columns {missing}")

def assert_non_empty(df: pd.DataFrame, *, df_name: str = "df") -> None:
    if len(df) == 0:
        raise ValueError(f"{df_name}: 0 rows")

def missingness_report(df: pd.DataFrame) -> pd.DataFrame:
    n = len(df)
    rep = df.isna().sum().rename("n_missing").to_frame()
    rep["p_missing"] = rep["n_missing"] / (n if n else 1)
    return rep.sort_values("p_missing", ascending=False)

Solution — bootcamp_data/checks.py (part 2)

import pandas as pd

def assert_unique_key(
    df: pd.DataFrame,
    key: str,
    *,
    allow_na: bool = False,
    df_name: str = "df",
) -> None:
    s = df[key]
    if (not allow_na) and (not s.notna().all()):
        raise ValueError(f"{df_name}.{key}: contains NA")
    dup = s.dropna().duplicated(keep=False)
    if dup.any():
        raise ValueError(f"{df_name}.{key}: not unique")

def assert_in_range(s: pd.Series, lo=None, hi=None, name: str = "value") -> None:
    x = s.dropna()
    if lo is not None and not (x >= lo).all():
        raise ValueError(f"{name}: below {lo}")
    if hi is not None and not (x <= hi).all():
        raise ValueError(f"{name}: above {hi}")

Task 2 — Add helpers to bootcamp_data/transforms.py (20 minutes)

Append these helper functions below your existing enforce_schema:

  • add_missing_flags(df, cols) -> df
  • normalize_text(series) -> series
  • apply_mapping(series, mapping) -> series
  • dedupe_keep_last(df, key_cols) -> df

Checkpoint: you can import them.

macOS/Linux

uv run python -c "from bootcamp_data.transforms import add_missing_flags, normalize_text, apply_mapping, dedupe_keep_last; print('transforms helpers: ok')"

Windows PowerShell

uv run python -c "from bootcamp_data.transforms import add_missing_flags, normalize_text, apply_mapping, dedupe_keep_last; print('transforms helpers: ok')"

Solution — helpers in bootcamp_data/transforms.py

import re
import pandas as pd

_ws = re.compile(r"\s+")

def add_missing_flags(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
    out = df.copy()
    for c in cols:
        out[f"{c}__isna"] = out[c].isna()
    return out

def normalize_text(s: pd.Series) -> pd.Series:
    return s.astype("string").str.strip().str.casefold().str.replace(_ws, " ", regex=True)

def apply_mapping(s: pd.Series, mapping: dict[str, str]) -> pd.Series:
    return s.map(lambda x: mapping.get(x, x))

def dedupe_keep_last(df: pd.DataFrame, key_cols: list[str]) -> pd.DataFrame:
    return df.drop_duplicates(subset=key_cols, keep="last").reset_index(drop=True)

Task 3 — Create scripts/run_day2_validate_and_report.py (40 minutes)

Write a Day 2 entrypoint that:

  1. loads raw CSVs (orders + users)

  2. runs fast checks (required columns + non-empty)

  3. enforces schema for orders

  4. writes a missingness CSV report in reports/

  5. cleans orders:

    • status_clean (normalize + mapping)
    • missing flags (amount, quantity)
    • dedupe by order_id (keep last)
  6. runs post-clean checks (unique order_id, non-negative amount/quantity)

  7. joins to users with validate="many_to_one" + indicator=True

  8. writes:

    • orders_clean.parquet
    • orders_enriched.parquet
    • reports/day2_missingness_orders.csv
    • reports/day2_summary.md

Checkpoint: script runs from repo root:

macOS/Linux

uv run python -m scripts.run_day2_validate_and_report

Windows PowerShell

uv run python -m scripts.run_day2_validate_and_report

Hint — a good order of operations

  • Validate fast things first (columns + non-empty)
  • Enforce types early (so joins don’t fail on dtype mismatch)
  • Clean + dedupe
  • Validate invariants (uniqueness, ranges)
  • Join with validate= + indicator=
  • Write artifacts (Parquet + report)

Warning

Do not “fix” data by deleting everything. Measure first. Keep your actions explicit.

Solution — scripts/run_day2_validate_and_report.py (1/8): imports + setup

import logging
from pathlib import Path

from bootcamp_data.config import make_paths
from bootcamp_data.io import read_orders_csv, read_users_csv, write_parquet
from bootcamp_data.checks import (
    require_columns,
    assert_non_empty,
    missingness_report,
    assert_unique_key,
    assert_in_range,
)
from bootcamp_data.transforms import (
    enforce_schema,
    add_missing_flags,
    normalize_text,
    apply_mapping,
    dedupe_keep_last,
)

log = logging.getLogger(__name__)
ROOT = Path(__file__).resolve().parents[1]

Solution — scripts/run_day2_validate_and_report.py (2/8): helpers + main()

def md_table(headers, rows) -> str:
    lines = []
    lines.append("| " + " | ".join(headers) + " |")
    lines.append("| " + " | ".join(["---"] * len(headers)) + " |")
    for row in rows:
        lines.append("| " + " | ".join(str(x) for x in row) + " |")
    return "\n".join(lines)

def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s: %(message)s")
    p = make_paths(ROOT)
    reports_dir = ROOT / "reports"
    reports_dir.mkdir(parents=True, exist_ok=True)
    # (See the next slides for the body of `main()`.)

if __name__ == "__main__":
    main()

Solution — scripts/run_day2_validate_and_report.py (3/8): load + fast checks

    log.info("Load raw inputs")
    orders_raw = read_orders_csv(p.raw / "orders.csv")
    users = read_users_csv(p.raw / "users.csv")

    require_columns(
        orders_raw,
        ["order_id", "user_id", "amount", "quantity", "created_at", "status"],
        df_name="orders_raw",
    )
    require_columns(users, ["user_id", "country", "signup_date"], df_name="users")
    assert_non_empty(orders_raw, df_name="orders_raw")
    assert_non_empty(users, df_name="users")

Solution — scripts/run_day2_validate_and_report.py (4/8): schema + missingness artifact

    orders = enforce_schema(orders_raw)
    users["user_id"] = users["user_id"].astype("string")

    rep = missingness_report(orders)
    rep_path = reports_dir / "day2_missingness_orders.csv"
    rep.to_csv(rep_path, index=True)
    log.info("Wrote missingness report: %s", rep_path)

Solution — scripts/run_day2_validate_and_report.py (5/8): clean + post-checks

    status_norm = normalize_text(orders["status"])
    mapping = {"paid": "paid", "refund": "refund", "refunded": "refund"}

    orders_clean = (
        orders.assign(status_clean=apply_mapping(status_norm, mapping))
        .pipe(add_missing_flags, cols=["amount", "quantity"])
        .pipe(dedupe_keep_last, key_cols=["order_id"])
    )

    assert_unique_key(orders_clean, "order_id", df_name="orders_clean")
    assert_in_range(orders_clean["amount"], lo=0, name="amount")
    assert_in_range(orders_clean["quantity"], lo=0, name="quantity")

Solution — scripts/run_day2_validate_and_report.py (6/8): safe join + metrics

    assert_unique_key(users, "user_id", df_name="users")

    orders_enriched = orders_clean.merge(
        users[["user_id", "country"]],
        on="user_id",
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    orders_enriched["user_found"] = orders_enriched["_merge"] == "both"
    orders_enriched = orders_enriched.drop(columns=["_merge"])

    n_total = len(orders_enriched)
    n_matched = int(orders_enriched["user_found"].sum())
    total_revenue = orders_clean["amount"].sum(min_count=1)

Solution — scripts/run_day2_validate_and_report.py (7/8): build the report

    # quick missingness snapshot (top 10 columns)
    top_missing = (
        rep.head(10)
          .reset_index()
          .rename(columns={"index": "column"})
    )
    missing_rows = [
        (r["column"], int(r["n_missing"]), f"{r['p_missing']:.1%}")
        for _, r in top_missing.iterrows()
    ]

    # duplicates snapshot (before dedupe)
    n_dup_rows = int(orders["order_id"].duplicated(keep=False).sum())
    n_removed_by_dedupe = len(orders) - len(orders_clean)

    # revenue by country (matched users only)
    by_country = (
        orders_enriched.loc[orders_enriched["user_found"]]
        .groupby("country", dropna=False)["amount"]
        .sum(min_count=1)
        .sort_values(ascending=False)
        .head(10)
    )
    country_rows = [
        (c if c == c else "(missing)", f"{v:.2f}" if v == v else "NA")
        for c, v in by_country.items()
    ]

    out_path = reports_dir / "day2_summary.md"
    report = []
    report.append("# Day 2 summary")
    report.append("")
    report.append("## Data checks")
    report.append(f"- orders_raw rows: **{len(orders_raw)}**")
    report.append(f"- orders (typed) rows: **{len(orders)}**")
    report.append(f"- orders_clean rows: **{len(orders_clean)}**")
    report.append(f"- orders_enriched rows: **{len(orders_enriched)}**")
    report.append(f"- duplicate `order_id` rows (pre-dedupe): **{n_dup_rows}**")
    report.append(f"- rows removed by dedupe: **{n_removed_by_dedupe}**")
    report.append(f"- join match rate: **{n_matched / n_total:.1%}** ({n_matched}/{n_total})")
    report.append(f"- missingness report: `{rep_path}`")

    if total_revenue == total_revenue:
        report.append(f"- total revenue (clean): **{float(total_revenue):.2f}**")
    else:
        report.append("- total revenue (clean): **NA**")

    report.append("")
    report.append("## Missingness snapshot (top 10 columns)")
    report.append(md_table(["column", "n_missing", "p_missing"], missing_rows))
    report.append("")
    report.append("## Top countries by revenue (matched users only)")
    report.append(md_table(["country", "revenue"], country_rows))
    report.append("")
    out_path.write_text("\n".join(report), encoding="utf-8")

Solution — scripts/run_day2_validate_and_report.py (8/8): write outputs

    write_parquet(orders_clean, p.processed / "orders_clean.parquet")
    write_parquet(orders_enriched, p.processed / "orders_enriched.parquet")
    write_parquet(users, p.processed / "users.parquet")

    log.info("Wrote processed outputs: %s", p.processed)
    log.info("Wrote report: %s", out_path)

Task 4 — Run + verify artifacts (15 minutes)

Run Day 2 and verify outputs exist.

macOS/Linux

uv run python -m scripts.run_day2_validate_and_report
uv run python -c "import pandas as pd; df=pd.read_parquet('data/processed/orders_enriched.parquet'); print(df[['status_clean','amount__isna','quantity__isna','user_found']].head())"
uv run python -c "import pandas as pd; print(pd.read_csv('reports/day2_missingness_orders.csv').head())"

Windows PowerShell

uv run python -m scripts.run_day2_validate_and_report
uv run python -c "import pandas as pd; df=pd.read_parquet('data/processed/orders_enriched.parquet'); print(df[['status_clean','amount__isna','quantity__isna','user_found']].head())"
uv run python -c "import pandas as pd; print(pd.read_csv('reports/day2_missingness_orders.csv').head())"

Checkpoint: you have:

  • data/processed/orders_clean.parquet
  • data/processed/orders_enriched.parquet
  • reports/day2_missingness_orders.csv
  • reports/day2_summary.md

Git checkpoint (5 minutes)

  • git status
  • commit with message: "w2d2: validate + clean + join + report"
  • push to GitHub

Checkpoint: repo shows your new commit online.

Solution — git commands

macOS/Linux

git add -A
git commit -m "w2d2: validate + clean + join + report"
git push

Windows PowerShell

git add -A
git commit -m "w2d2: validate + clean + join + report"
git push

Debug playbook (Day 2)

When something fails:

  1. Confirm you’re in repo root (ls should show bootcamp_data/ and scripts/)

  2. Re-run the exact command: uv run python -m scripts.run_day2_validate_and_report

  3. Print key facts:

    • df.shape, df.dtypes, df.head()
  4. If merge fails:

    • check key dtypes match
    • check uniqueness on the “one” side (users.user_id)
  5. If a check fails:

    • don’t delete data; inspect the failing rows and decide a rule

Tip

Most bugs are “my assumption about format was wrong.” Make the assumption explicit, then measure.

Stretch goals (optional)

If you finish early:

  • Add a “Top 10 statuses” table to the report (status_clean counts)
  • Add a “Top 5 missing columns” table to the report (from missingness_report)
  • Add user_found == False count to the report (unmatched users)
  • Add a second report file: reports/day2_checks.md (what checks ran + what they asserted)

Exit Ticket

In 1–2 sentences:

What does validate="many_to_one" protect you from, and what problem does indicator=True help you measure?

What to do after class (Day 2 assignment)

Due: before Day 3 starts

  1. Make sure these commands work from a fresh terminal (repo root):

    • uv run python -m scripts.run_day1_load
    • uv run python -m scripts.run_day2_validate_and_report
  2. Push your changes to GitHub

  3. Confirm these artifacts exist in your repo:

    • bootcamp_data/checks.py
    • data/processed/orders_clean.parquet
    • data/processed/orders_enriched.parquet
    • reports/day2_missingness_orders.csv
    • reports/day2_summary.md (row counts + duplicates + missingness snapshot + join match rate)
  4. Add one dataset-specific check (an invariant you believe should always hold) and include the result in reports/day2_summary.md.

    • Example ideas: status_clean is in an allowed set; a high-percentile cap is reasonable; key columns aren’t missing above X%; etc. Deliverable: GitHub repo link + screenshot of reports/day2_summary.md opened.

Tip

Add 1–2 commits with clear messages. Don’t wait until the end of the week.

Thank You!