From 246d8571cade0dd6c15e4c9550daded057782e03 Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Wed, 13 Aug 2025 14:06:26 +0100 Subject: [PATCH 1/9] added RAP modules --- .../${{values.package_name}}/__init__.py | 33 +++++++++++++++++++ .../${{values.package_name}}/disclosure.py | 24 ++++++++++++++ .../${{values.package_name}}/estimation.py | 22 +++++++++++++ .../${{values.package_name}}/ingestion.py | 18 ++++++++++ 4 files changed, 97 insertions(+) create mode 100644 project_template/${{values.package_name}}/__init__.py create mode 100644 project_template/${{values.package_name}}/disclosure.py create mode 100644 project_template/${{values.package_name}}/estimation.py create mode 100644 project_template/${{values.package_name}}/ingestion.py diff --git a/project_template/${{values.package_name}}/__init__.py b/project_template/${{values.package_name}}/__init__.py new file mode 100644 index 0000000..142b4f1 --- /dev/null +++ b/project_template/${{values.package_name}}/__init__.py @@ -0,0 +1,33 @@ +# Import RAP modules +from .disclosure import primary_suppression, rounding +from .estimation import aggregate +from .ingestion import load_table + + + +def run_rap_example(): + """ + Example script to demonstrate RAP workflow using example_data.csv. + Loads data, aggregates statistics, applies disclosure control, and prints results. + """ + import os + # Path to example data + data_path = os.path.join(os.path.dirname(__file__), '..', 'example_data.csv') + data_path = os.path.abspath(data_path) + print(f"Loading data from: {data_path}") + # Load data + df = load_table(data_path) + print("Loaded data:") + print(df.head()) + # Estimate statistics (aggregate) + list_of_groups = ['region', 'category'] # Example group columns from example_data.csv + results = aggregate(df, group_by=list_of_groups, measure='quantity') # Aggregate by quantity + print("Estimation results:") + print(results) + # Apply disclosure control (primary_suppression + rounding) + threshold = 2 # Example: suppress groups with less than 2 items + suppressed = primary_suppression(results, threshold) + base = 5 # Example: round to nearest 5 + rounded = rounding(suppressed, base) + print("Disclosure controlled results:") + print(rounded) diff --git a/project_template/${{values.package_name}}/disclosure.py b/project_template/${{values.package_name}}/disclosure.py new file mode 100644 index 0000000..98f555f --- /dev/null +++ b/project_template/${{values.package_name}}/disclosure.py @@ -0,0 +1,24 @@ +from __future__ import annotations +import math +import pandas as pd + +def primary_suppression(df: pd.DataFrame, threshold: int) -> pd.DataFrame: + """Suppress rows with n < threshold by setting sensitive cells to NaN.""" + out = df.copy() + mask = out["n"] < threshold + for col in ("total", "mean"): + if col in out.columns: + out.loc[mask, col] = pd.NA + out.loc[mask, "suppressed"] = True + out.loc[~mask, "suppressed"] = False + return out + +def rounding(df: pd.DataFrame, base: int) -> pd.DataFrame: + """Round publishable numeric cells to the nearest 'base'.""" + out = df.copy() + for col in ("total", "mean"): + if col in out.columns: + out[col] = out[col].apply( + lambda x: (math.floor((x / base) + 0.5) * base) if pd.notna(x) else x + ) + return out diff --git a/project_template/${{values.package_name}}/estimation.py b/project_template/${{values.package_name}}/estimation.py new file mode 100644 index 0000000..59dd344 --- /dev/null +++ b/project_template/${{values.package_name}}/estimation.py @@ -0,0 +1,22 @@ +from __future__ import annotations +import pandas as pd + +def aggregate(df: pd.DataFrame, group_by: list[str], measure: str) -> pd.DataFrame: + """ + Example 'estimation' step: group and compute count, sum, mean for a measure. + Returns a tidy table with one row per group. + """ + if not set([*group_by, measure]).issubset(df.columns): + missing = set([*group_by, measure]) - set(df.columns) + raise KeyError(f"Missing columns: {missing}") + + out = ( + df.groupby(group_by, dropna=False)[measure] + .agg(n="count", total="sum", mean="mean") + .reset_index() + ) + # enforce column order + result = out.loc[:, [*group_by, "n", "total", "mean"]] + if isinstance(result, pd.Series): + result = result.to_frame().T + return result diff --git a/project_template/${{values.package_name}}/ingestion.py b/project_template/${{values.package_name}}/ingestion.py new file mode 100644 index 0000000..af91468 --- /dev/null +++ b/project_template/${{values.package_name}}/ingestion.py @@ -0,0 +1,18 @@ +from __future__ import annotations +from pathlib import Path +import pandas as pd + +def load_table(path: str | Path) -> pd.DataFrame: + """Load CSV or Parquet into a DataFrame with standardised dtypes.""" + p = Path(path) + if not p.exists(): + raise FileNotFoundError(p) + if p.suffix.lower() in {".csv"}: + df = pd.read_csv(p) + elif p.suffix.lower() in {".parquet", ".pq"}: + df = pd.read_parquet(p) + else: + raise ValueError(f"Unsupported file type: {p.suffix}") + # Normalise column names + df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] + return df From 7199f04f324da993ef4b1a632119ddb0066b2396 Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Wed, 13 Aug 2025 14:09:21 +0100 Subject: [PATCH 2/9] Fix logging --- .../${{values.package_name}}/__init__.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/project_template/${{values.package_name}}/__init__.py b/project_template/${{values.package_name}}/__init__.py index 142b4f1..ee6636e 100644 --- a/project_template/${{values.package_name}}/__init__.py +++ b/project_template/${{values.package_name}}/__init__.py @@ -1,8 +1,12 @@ + +import logging + # Import RAP modules from .disclosure import primary_suppression, rounding from .estimation import aggregate from .ingestion import load_table +logger = logging.getLogger(__name__) def run_rap_example(): @@ -11,23 +15,25 @@ def run_rap_example(): Loads data, aggregates statistics, applies disclosure control, and prints results. """ import os + import logging + logging.basicConfig(level=logging.INFO) # Path to example data data_path = os.path.join(os.path.dirname(__file__), '..', 'example_data.csv') data_path = os.path.abspath(data_path) - print(f"Loading data from: {data_path}") + logging.info(f"Loading data from: {data_path}") # Load data df = load_table(data_path) - print("Loaded data:") - print(df.head()) + logging.info("Loaded data:") + logging.info(df.head()) # Estimate statistics (aggregate) list_of_groups = ['region', 'category'] # Example group columns from example_data.csv results = aggregate(df, group_by=list_of_groups, measure='quantity') # Aggregate by quantity - print("Estimation results:") - print(results) + logging.info("Estimation results:") + logging.info(results) # Apply disclosure control (primary_suppression + rounding) threshold = 2 # Example: suppress groups with less than 2 items suppressed = primary_suppression(results, threshold) base = 5 # Example: round to nearest 5 rounded = rounding(suppressed, base) - print("Disclosure controlled results:") - print(rounded) + logging.info("Disclosure controlled results:") + logging.info(rounded) From 9c8fb753b5aa6311d5cf7c9129c3b023157a4a13 Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Thu, 14 Aug 2025 09:59:00 +0100 Subject: [PATCH 3/9] Improvements --- .../${{values.package_name}}/__init__.py | 53 +++++-------------- .../${{values.package_name}}/pipeline.py | 43 +++++++++++++++ project_template/configs/pipeline.yaml | 8 +++ 3 files changed, 65 insertions(+), 39 deletions(-) create mode 100644 project_template/${{values.package_name}}/pipeline.py create mode 100644 project_template/configs/pipeline.yaml diff --git a/project_template/${{values.package_name}}/__init__.py b/project_template/${{values.package_name}}/__init__.py index ee6636e..8a1bf7e 100644 --- a/project_template/${{values.package_name}}/__init__.py +++ b/project_template/${{values.package_name}}/__init__.py @@ -1,39 +1,14 @@ - -import logging - -# Import RAP modules -from .disclosure import primary_suppression, rounding -from .estimation import aggregate -from .ingestion import load_table - -logger = logging.getLogger(__name__) - - -def run_rap_example(): - """ - Example script to demonstrate RAP workflow using example_data.csv. - Loads data, aggregates statistics, applies disclosure control, and prints results. - """ - import os - import logging - logging.basicConfig(level=logging.INFO) - # Path to example data - data_path = os.path.join(os.path.dirname(__file__), '..', 'example_data.csv') - data_path = os.path.abspath(data_path) - logging.info(f"Loading data from: {data_path}") - # Load data - df = load_table(data_path) - logging.info("Loaded data:") - logging.info(df.head()) - # Estimate statistics (aggregate) - list_of_groups = ['region', 'category'] # Example group columns from example_data.csv - results = aggregate(df, group_by=list_of_groups, measure='quantity') # Aggregate by quantity - logging.info("Estimation results:") - logging.info(results) - # Apply disclosure control (primary_suppression + rounding) - threshold = 2 # Example: suppress groups with less than 2 items - suppressed = primary_suppression(results, threshold) - base = 5 # Example: round to nearest 5 - rounded = rounding(suppressed, base) - logging.info("Disclosure controlled results:") - logging.info(rounded) +from __future__ import annotations +import argparse +from .pipeline import PipelineConfig, run + +def app(): + parser = argparse.ArgumentParser(prog="{{ package_name }}", description="RAP sample pipeline") + parser.add_argument("command", choices=["run"], help="Command to run") + parser.add_argument("-c", "--config", default="configs/pipeline.yaml", help="Path to config") + args = parser.parse_args() + + if args.command == "run": + cfg = PipelineConfig.from_yaml(args.config) + _, _, _ = run(cfg) + print("Pipeline complete.") \ No newline at end of file diff --git a/project_template/${{values.package_name}}/pipeline.py b/project_template/${{values.package_name}}/pipeline.py new file mode 100644 index 0000000..47734d9 --- /dev/null +++ b/project_template/${{values.package_name}}/pipeline.py @@ -0,0 +1,43 @@ +from __future__ import annotations +from dataclasses import dataclass +from pathlib import Path +import yaml +import pandas as pd + +from .ingestion import load_table +from .estimation import aggregate +from .disclosure import primary_suppression, rounding + +@dataclass(frozen=True) +class PipelineConfig: + input_path: str + output_dir: str + group_by: list[str] + measure: str + disclosure: dict + + @staticmethod + def from_yaml(path: str | Path) -> "PipelineConfig": + data = yaml.safe_load(Path(path).read_text()) + return PipelineConfig(**data) + +def run(config: PipelineConfig) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + # Ingest + raw = load_table(config.input_path) + + # Estimate + agg = aggregate(raw, config.group_by, config.measure) + + # Disclosure control + th = int(config.disclosure.get("primary_suppression_threshold", 5)) + base = int(config.disclosure.get("rounding_base", 5)) + protected = primary_suppression(agg, th) + published = rounding(protected, base) + + # Persist + out_dir = Path(config.output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + agg.to_csv(out_dir / "aggregates.csv", index=False) + published.to_csv(out_dir / "published.csv", index=False) + + return raw, agg, published diff --git a/project_template/configs/pipeline.yaml b/project_template/configs/pipeline.yaml new file mode 100644 index 0000000..4377041 --- /dev/null +++ b/project_template/configs/pipeline.yaml @@ -0,0 +1,8 @@ +# Minimal, declarative knobs for the sample flow +input_path: "../example_data.csv" +output_dir: "data/output" +group_by: ['region', 'category'] +measure: "value" +disclosure: + primary_suppression_threshold: 2 # suppress groups with < 5 rows + rounding_base: 5 # round published cells to nearest 5 From 9b1160668ba24203942f99fb81918df9b78a5b7d Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Thu, 14 Aug 2025 11:08:43 +0100 Subject: [PATCH 4/9] adding tests --- .../tests/rap_modules/__init__.py | 0 .../tests/unit/test_estimation.py.njk | 22 +++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 project_template/tests/rap_modules/__init__.py create mode 100644 project_template/tests/unit/test_estimation.py.njk diff --git a/project_template/tests/rap_modules/__init__.py b/project_template/tests/rap_modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/project_template/tests/unit/test_estimation.py.njk b/project_template/tests/unit/test_estimation.py.njk new file mode 100644 index 0000000..90b212c --- /dev/null +++ b/project_template/tests/unit/test_estimation.py.njk @@ -0,0 +1,22 @@ + +import pandas as pd +from pathlib import Path +from ${{ values.package_name }}.estimation import aggregate +from ${{ values.package_name }}.ingestion import load_table + +def test_aggregate_group_quantity(): + # Use the actual example_data.csv file + example_path = Path(__file__).parent.parent.parent / "example_data.csv" + df = load_table(example_path) + out = aggregate(df, ["region"], "quantity") + # Check expected columns + assert "region" in out.columns + assert "n" in out.columns + assert "total" in out.columns + assert "mean" in out.columns + # Check that there are results for each region + regions = set(df["region"].unique()) + assert set(out["region"]) == regions + +if __name__ == "__main__": + unittest.main() From 87fa25bf782263a4fbbf9259c1fda346108e294d Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Thu, 14 Aug 2025 11:19:32 +0100 Subject: [PATCH 5/9] more testing --- .../tests/rap_modules/test_rap_modules.py.njk | 41 +++++++++++++++++++ .../tests/unit/test_disclosure.py.njk | 26 ++++++++++++ .../tests/unit/test_ingest.py.njk | 18 ++++++++ 3 files changed, 85 insertions(+) create mode 100644 project_template/tests/rap_modules/test_rap_modules.py.njk create mode 100644 project_template/tests/unit/test_disclosure.py.njk create mode 100644 project_template/tests/unit/test_ingest.py.njk diff --git a/project_template/tests/rap_modules/test_rap_modules.py.njk b/project_template/tests/rap_modules/test_rap_modules.py.njk new file mode 100644 index 0000000..af25ee3 --- /dev/null +++ b/project_template/tests/rap_modules/test_rap_modules.py.njk @@ -0,0 +1,41 @@ +import unittest +from pathlib import Path +from ${{ values.package_name }}.ingestion import load_table +from ${{ values.package_name }}.estimation import aggregate +from ${{ values.package_name }}.disclosure import primary_suppression, rounding + +class TestRAPModulesWorkflow(unittest.TestCase): + def setUp(self): + self.example_path = Path(__file__).parent.parent.parent / "example_data.csv" + self.df = load_table(self.example_path) + self.agg = aggregate(self.df, ["region", "category"], "quantity") + self.suppressed = primary_suppression(self.agg, threshold=2) + self.rounded = rounding(self.suppressed, base=5) + + def test_ingestion(self): + self.assertIsNotNone(self.df) + self.assertGreater(len(self.df), 0) + + def test_estimation(self): + self.assertIsNotNone(self.agg) + self.assertIn("total", self.agg.columns) + self.assertGreater(len(self.agg), 0) + + def test_disclosure(self): + self.assertIn("suppressed", self.suppressed.columns) + self.assertIn("total", self.rounded.columns) + # Check rounding for non-suppressed rows + for t, s in zip(self.rounded["total"], self.rounded["suppressed"]): + if not s: + self.assertEqual(t % 5, 0) + + def test_rap_workflow(self): + df = load_table(self.example_path) + agg = aggregate(df, ["region", "category"], "quantity") + suppressed = primary_suppression(agg, threshold=2) + rounded = rounding(suppressed, base=5) + self.assertGreater(len(rounded), 0) + self.assertIn("total", rounded.columns) + +if __name__ == "__main__": + unittest.main() diff --git a/project_template/tests/unit/test_disclosure.py.njk b/project_template/tests/unit/test_disclosure.py.njk new file mode 100644 index 0000000..24b7899 --- /dev/null +++ b/project_template/tests/unit/test_disclosure.py.njk @@ -0,0 +1,26 @@ +import unittest +import pandas as pd +from pathlib import Path +from ${{ values.package_name }}.disclosure import primary_suppression, rounding +from ${{ values.package_name }}.ingestion import load_table +from ${{ values.package_name }}.estimation import aggregate + +def test_suppression_and_rounding_real_data(): + # Load and aggregate example_data.csv + example_path = Path(__file__).parent.parent.parent / "example_data.csv" + df = load_table(example_path) + agg = aggregate(df, ["region", "category"], "quantity") + # Apply primary suppression + sup = primary_suppression(agg, threshold=2) + # Check that suppressed column exists and is boolean + assert "suppressed" in sup.columns + assert sup["suppressed"].dtype == bool or sup["suppressed"].dtype == object + # Apply rounding + pub = rounding(sup, base=5) + # Check that totals are multiples of 5 (for non-suppressed rows) + for t, s in zip(pub["total"], pub["suppressed"]): + if not s: + assert t % 5 == 0 + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/project_template/tests/unit/test_ingest.py.njk b/project_template/tests/unit/test_ingest.py.njk new file mode 100644 index 0000000..3fd021b --- /dev/null +++ b/project_template/tests/unit/test_ingest.py.njk @@ -0,0 +1,18 @@ +import unittest + +from pathlib import Path + +from ${{ values.package_name }}.ingestion import load_table + +def test_load_table_example_csv(): + # Use the actual example_data.csv file + example_path = Path(__file__).parent.parent.parent / "example_data.csv" + df = load_table(example_path) + # Check expected columns from example_data.csv + expected_columns = {"product_id", "product_name", "category", "quantity", "price", "date", "customer_id", "region", "sales_rep"} + assert set(df.columns) == expected_columns + # Check that the file is not empty + assert len(df) > 0 + +if __name__ == "__main__": + unittest.main() From 91abe0e0eea3ad1d881f3f413f9d29133265ab60 Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Thu, 14 Aug 2025 14:33:22 +0100 Subject: [PATCH 6/9] Fix --- .../${{values.package_name}}/__init__.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/project_template/${{values.package_name}}/__init__.py b/project_template/${{values.package_name}}/__init__.py index 8a1bf7e..c34370e 100644 --- a/project_template/${{values.package_name}}/__init__.py +++ b/project_template/${{values.package_name}}/__init__.py @@ -1,14 +1,27 @@ from __future__ import annotations import argparse +import logging from .pipeline import PipelineConfig, run +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + def app(): parser = argparse.ArgumentParser(prog="{{ package_name }}", description="RAP sample pipeline") parser.add_argument("command", choices=["run"], help="Command to run") parser.add_argument("-c", "--config", default="configs/pipeline.yaml", help="Path to config") args = parser.parse_args() + logger.info(f"Starting pipeline with command: {args.command}") + logger.info(f"Using config file: {args.config}") + if args.command == "run": cfg = PipelineConfig.from_yaml(args.config) + logger.debug(f"Loaded config: {cfg}") _, _, _ = run(cfg) - print("Pipeline complete.") \ No newline at end of file + logger.info("Pipeline complete.") + +__all__ = [ + "PipelineConfig", + "run", +] \ No newline at end of file From 2fee1a8d2c17aa000973bd8e7bda982891c6c7dd Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Thu, 14 Aug 2025 15:05:06 +0100 Subject: [PATCH 7/9] Feature run_rap_module --- .../${{values.package_name}}/pipeline.py | 2 +- project_template/run_rap_module.py.njk | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 project_template/run_rap_module.py.njk diff --git a/project_template/${{values.package_name}}/pipeline.py b/project_template/${{values.package_name}}/pipeline.py index 47734d9..6cb1aa5 100644 --- a/project_template/${{values.package_name}}/pipeline.py +++ b/project_template/${{values.package_name}}/pipeline.py @@ -29,7 +29,7 @@ def run(config: PipelineConfig) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFram agg = aggregate(raw, config.group_by, config.measure) # Disclosure control - th = int(config.disclosure.get("primary_suppression_threshold", 5)) + th = int(config.disclosure.get("primary_suppression_threshold", 2)) base = int(config.disclosure.get("rounding_base", 5)) protected = primary_suppression(agg, th) published = rounding(protected, base) diff --git a/project_template/run_rap_module.py.njk b/project_template/run_rap_module.py.njk new file mode 100644 index 0000000..386b013 --- /dev/null +++ b/project_template/run_rap_module.py.njk @@ -0,0 +1,46 @@ +"""Demonstration script for the RAP pipeline.""" + +import logging +from pathlib import Path + +from ${{values.package_name}} import ( + disclosure, + estimation, + ingestion, + pipeline, +) + +def main(): + # Define file paths + source_file = "example_data.csv" + output_file = "outputs/processed_data.csv" + + # Configure logging + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + + logger.info("Step 1: Ingestion - Loading data...") + df = ingestion.load_table(source_file) + logger.info(f"Loaded data with shape: {df.shape}") + + logger.info("Step 2: Estimation - Aggregating data...") + group_by = ["region", "category"] + measure = "quantity" + agg = estimation.aggregate(df, group_by=group_by, measure=measure) + logger.info(f"Aggregated data:\n{agg}") + + logger.info("Step 3: Disclosure - Applying primary suppression and rounding...") + threshold = 2 + base = 5 + suppressed = disclosure.primary_suppression(agg, threshold=threshold) + rounded = disclosure.rounding(suppressed, base=base) + logger.info(f"Disclosure controlled results:\n{rounded}") + + # Optionally, save the output + output_path = Path(output_file) + output_path.parent.mkdir(parents=True, exist_ok=True) + rounded.to_csv(output_path, index=False) + logger.info(f"Final output saved to: {output_file}") + +if __name__ == "__main__": + main() From ed8f152d8dc795aa22df01e221351c55cf1780df Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Thu, 14 Aug 2025 15:23:13 +0100 Subject: [PATCH 8/9] added logging --- .../${{values.package_name}}/disclosure.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/project_template/${{values.package_name}}/disclosure.py b/project_template/${{values.package_name}}/disclosure.py index 98f555f..c86fc8b 100644 --- a/project_template/${{values.package_name}}/disclosure.py +++ b/project_template/${{values.package_name}}/disclosure.py @@ -1,14 +1,20 @@ from __future__ import annotations import math +import logging import pandas as pd +logger = logging.getLogger(__name__) + def primary_suppression(df: pd.DataFrame, threshold: int) -> pd.DataFrame: """Suppress rows with n < threshold by setting sensitive cells to NaN.""" out = df.copy() mask = out["n"] < threshold + suppressed_count = mask.sum() + logging.info(f"Applying primary suppression: threshold={threshold}, suppressed rows={suppressed_count}") for col in ("total", "mean"): if col in out.columns: out.loc[mask, col] = pd.NA + logging.debug(f"Suppressed column '{col}' for {suppressed_count} rows.") out.loc[mask, "suppressed"] = True out.loc[~mask, "suppressed"] = False return out @@ -16,9 +22,13 @@ def primary_suppression(df: pd.DataFrame, threshold: int) -> pd.DataFrame: def rounding(df: pd.DataFrame, base: int) -> pd.DataFrame: """Round publishable numeric cells to the nearest 'base'.""" out = df.copy() + logging.info(f"Applying rounding: base={base}") for col in ("total", "mean"): if col in out.columns: + before = out[col].copy() out[col] = out[col].apply( lambda x: (math.floor((x / base) + 0.5) * base) if pd.notna(x) else x ) + changed = (before != out[col]).sum() + logging.debug(f"Rounded column '{col}' for {changed} rows.") return out From 3a418d1d9e4254ac884a88c363cde3500ce0a7d1 Mon Sep 17 00:00:00 2001 From: Jozsef K Date: Thu, 14 Aug 2025 15:25:24 +0100 Subject: [PATCH 9/9] more logging --- .../${{values.package_name}}/estimation.py | 7 +++++++ .../${{values.package_name}}/ingestion.py | 12 +++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/project_template/${{values.package_name}}/estimation.py b/project_template/${{values.package_name}}/estimation.py index 59dd344..74bcfdb 100644 --- a/project_template/${{values.package_name}}/estimation.py +++ b/project_template/${{values.package_name}}/estimation.py @@ -1,6 +1,9 @@ from __future__ import annotations +import logging import pandas as pd +logger = logging.getLogger(__name__) + def aggregate(df: pd.DataFrame, group_by: list[str], measure: str) -> pd.DataFrame: """ Example 'estimation' step: group and compute count, sum, mean for a measure. @@ -8,15 +11,19 @@ def aggregate(df: pd.DataFrame, group_by: list[str], measure: str) -> pd.DataFra """ if not set([*group_by, measure]).issubset(df.columns): missing = set([*group_by, measure]) - set(df.columns) + logger.error(f"Missing columns for aggregation: {missing}") raise KeyError(f"Missing columns: {missing}") + logger.info(f"Aggregating measure '{measure}' by groups {group_by}") out = ( df.groupby(group_by, dropna=False)[measure] .agg(n="count", total="sum", mean="mean") .reset_index() ) + logger.debug(f"Aggregation result:\n{out}") # enforce column order result = out.loc[:, [*group_by, "n", "total", "mean"]] if isinstance(result, pd.Series): result = result.to_frame().T + logger.info(f"Aggregated {len(result)} rows.") return result diff --git a/project_template/${{values.package_name}}/ingestion.py b/project_template/${{values.package_name}}/ingestion.py index af91468..61ee7ef 100644 --- a/project_template/${{values.package_name}}/ingestion.py +++ b/project_template/${{values.package_name}}/ingestion.py @@ -1,18 +1,28 @@ from __future__ import annotations -from pathlib import Path +import logging import pandas as pd +from pathlib import Path + +logger = logging.getLogger(__name__) def load_table(path: str | Path) -> pd.DataFrame: """Load CSV or Parquet into a DataFrame with standardised dtypes.""" p = Path(path) + logger.info(f"Loading data from: {p}") if not p.exists(): + logger.error(f"File not found: {p}") raise FileNotFoundError(p) if p.suffix.lower() in {".csv"}: df = pd.read_csv(p) + logger.info(f"Loaded CSV file with shape: {df.shape}") elif p.suffix.lower() in {".parquet", ".pq"}: df = pd.read_parquet(p) + logger.info(f"Loaded Parquet file with shape: {df.shape}") else: + logger.error(f"Unsupported file type: {p.suffix}") raise ValueError(f"Unsupported file type: {p.suffix}") # Normalise column names + old_columns = list(df.columns) df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] + logger.debug(f"Normalized columns from {old_columns} to {list(df.columns)}") return df