diff --git a/project_template/${{values.package_name}}/__init__.py b/project_template/${{values.package_name}}/__init__.py new file mode 100644 index 0000000..c34370e --- /dev/null +++ b/project_template/${{values.package_name}}/__init__.py @@ -0,0 +1,27 @@ +from __future__ import annotations +import argparse +import logging +from .pipeline import PipelineConfig, run + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def app(): + parser = argparse.ArgumentParser(prog="{{ package_name }}", description="RAP sample pipeline") + parser.add_argument("command", choices=["run"], help="Command to run") + parser.add_argument("-c", "--config", default="configs/pipeline.yaml", help="Path to config") + args = parser.parse_args() + + logger.info(f"Starting pipeline with command: {args.command}") + logger.info(f"Using config file: {args.config}") + + if args.command == "run": + cfg = PipelineConfig.from_yaml(args.config) + logger.debug(f"Loaded config: {cfg}") + _, _, _ = run(cfg) + logger.info("Pipeline complete.") + +__all__ = [ + "PipelineConfig", + "run", +] \ No newline at end of file diff --git a/project_template/${{values.package_name}}/disclosure.py b/project_template/${{values.package_name}}/disclosure.py new file mode 100644 index 0000000..c86fc8b --- /dev/null +++ b/project_template/${{values.package_name}}/disclosure.py @@ -0,0 +1,34 @@ +from __future__ import annotations +import math +import logging +import pandas as pd + +logger = logging.getLogger(__name__) + +def primary_suppression(df: pd.DataFrame, threshold: int) -> pd.DataFrame: + """Suppress rows with n < threshold by setting sensitive cells to NaN.""" + out = df.copy() + mask = out["n"] < threshold + suppressed_count = mask.sum() + logging.info(f"Applying primary suppression: threshold={threshold}, suppressed rows={suppressed_count}") + for col in ("total", "mean"): + if col in out.columns: + out.loc[mask, col] = pd.NA + logging.debug(f"Suppressed column '{col}' for {suppressed_count} rows.") + out.loc[mask, "suppressed"] = True + out.loc[~mask, "suppressed"] = False + return out + +def rounding(df: pd.DataFrame, base: int) -> pd.DataFrame: + """Round publishable numeric cells to the nearest 'base'.""" + out = df.copy() + logging.info(f"Applying rounding: base={base}") + for col in ("total", "mean"): + if col in out.columns: + before = out[col].copy() + out[col] = out[col].apply( + lambda x: (math.floor((x / base) + 0.5) * base) if pd.notna(x) else x + ) + changed = (before != out[col]).sum() + logging.debug(f"Rounded column '{col}' for {changed} rows.") + return out diff --git a/project_template/${{values.package_name}}/estimation.py b/project_template/${{values.package_name}}/estimation.py new file mode 100644 index 0000000..74bcfdb --- /dev/null +++ b/project_template/${{values.package_name}}/estimation.py @@ -0,0 +1,29 @@ +from __future__ import annotations +import logging +import pandas as pd + +logger = logging.getLogger(__name__) + +def aggregate(df: pd.DataFrame, group_by: list[str], measure: str) -> pd.DataFrame: + """ + Example 'estimation' step: group and compute count, sum, mean for a measure. + Returns a tidy table with one row per group. + """ + if not set([*group_by, measure]).issubset(df.columns): + missing = set([*group_by, measure]) - set(df.columns) + logger.error(f"Missing columns for aggregation: {missing}") + raise KeyError(f"Missing columns: {missing}") + + logger.info(f"Aggregating measure '{measure}' by groups {group_by}") + out = ( + df.groupby(group_by, dropna=False)[measure] + .agg(n="count", total="sum", mean="mean") + .reset_index() + ) + logger.debug(f"Aggregation result:\n{out}") + # enforce column order + result = out.loc[:, [*group_by, "n", "total", "mean"]] + if isinstance(result, pd.Series): + result = result.to_frame().T + logger.info(f"Aggregated {len(result)} rows.") + return result diff --git a/project_template/${{values.package_name}}/ingestion.py b/project_template/${{values.package_name}}/ingestion.py new file mode 100644 index 0000000..61ee7ef --- /dev/null +++ b/project_template/${{values.package_name}}/ingestion.py @@ -0,0 +1,28 @@ +from __future__ import annotations +import logging +import pandas as pd +from pathlib import Path + +logger = logging.getLogger(__name__) + +def load_table(path: str | Path) -> pd.DataFrame: + """Load CSV or Parquet into a DataFrame with standardised dtypes.""" + p = Path(path) + logger.info(f"Loading data from: {p}") + if not p.exists(): + logger.error(f"File not found: {p}") + raise FileNotFoundError(p) + if p.suffix.lower() in {".csv"}: + df = pd.read_csv(p) + logger.info(f"Loaded CSV file with shape: {df.shape}") + elif p.suffix.lower() in {".parquet", ".pq"}: + df = pd.read_parquet(p) + logger.info(f"Loaded Parquet file with shape: {df.shape}") + else: + logger.error(f"Unsupported file type: {p.suffix}") + raise ValueError(f"Unsupported file type: {p.suffix}") + # Normalise column names + old_columns = list(df.columns) + df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] + logger.debug(f"Normalized columns from {old_columns} to {list(df.columns)}") + return df diff --git a/project_template/${{values.package_name}}/pipeline.py b/project_template/${{values.package_name}}/pipeline.py new file mode 100644 index 0000000..6cb1aa5 --- /dev/null +++ b/project_template/${{values.package_name}}/pipeline.py @@ -0,0 +1,43 @@ +from __future__ import annotations +from dataclasses import dataclass +from pathlib import Path +import yaml +import pandas as pd + +from .ingestion import load_table +from .estimation import aggregate +from .disclosure import primary_suppression, rounding + +@dataclass(frozen=True) +class PipelineConfig: + input_path: str + output_dir: str + group_by: list[str] + measure: str + disclosure: dict + + @staticmethod + def from_yaml(path: str | Path) -> "PipelineConfig": + data = yaml.safe_load(Path(path).read_text()) + return PipelineConfig(**data) + +def run(config: PipelineConfig) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + # Ingest + raw = load_table(config.input_path) + + # Estimate + agg = aggregate(raw, config.group_by, config.measure) + + # Disclosure control + th = int(config.disclosure.get("primary_suppression_threshold", 2)) + base = int(config.disclosure.get("rounding_base", 5)) + protected = primary_suppression(agg, th) + published = rounding(protected, base) + + # Persist + out_dir = Path(config.output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + agg.to_csv(out_dir / "aggregates.csv", index=False) + published.to_csv(out_dir / "published.csv", index=False) + + return raw, agg, published diff --git a/project_template/configs/pipeline.yaml b/project_template/configs/pipeline.yaml new file mode 100644 index 0000000..4377041 --- /dev/null +++ b/project_template/configs/pipeline.yaml @@ -0,0 +1,8 @@ +# Minimal, declarative knobs for the sample flow +input_path: "../example_data.csv" +output_dir: "data/output" +group_by: ['region', 'category'] +measure: "value" +disclosure: + primary_suppression_threshold: 2 # suppress groups with < 5 rows + rounding_base: 5 # round published cells to nearest 5 diff --git a/project_template/run_rap_module.py.njk b/project_template/run_rap_module.py.njk new file mode 100644 index 0000000..386b013 --- /dev/null +++ b/project_template/run_rap_module.py.njk @@ -0,0 +1,46 @@ +"""Demonstration script for the RAP pipeline.""" + +import logging +from pathlib import Path + +from ${{values.package_name}} import ( + disclosure, + estimation, + ingestion, + pipeline, +) + +def main(): + # Define file paths + source_file = "example_data.csv" + output_file = "outputs/processed_data.csv" + + # Configure logging + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + + logger.info("Step 1: Ingestion - Loading data...") + df = ingestion.load_table(source_file) + logger.info(f"Loaded data with shape: {df.shape}") + + logger.info("Step 2: Estimation - Aggregating data...") + group_by = ["region", "category"] + measure = "quantity" + agg = estimation.aggregate(df, group_by=group_by, measure=measure) + logger.info(f"Aggregated data:\n{agg}") + + logger.info("Step 3: Disclosure - Applying primary suppression and rounding...") + threshold = 2 + base = 5 + suppressed = disclosure.primary_suppression(agg, threshold=threshold) + rounded = disclosure.rounding(suppressed, base=base) + logger.info(f"Disclosure controlled results:\n{rounded}") + + # Optionally, save the output + output_path = Path(output_file) + output_path.parent.mkdir(parents=True, exist_ok=True) + rounded.to_csv(output_path, index=False) + logger.info(f"Final output saved to: {output_file}") + +if __name__ == "__main__": + main() diff --git a/project_template/tests/rap_modules/__init__.py b/project_template/tests/rap_modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/project_template/tests/rap_modules/test_rap_modules.py.njk b/project_template/tests/rap_modules/test_rap_modules.py.njk new file mode 100644 index 0000000..af25ee3 --- /dev/null +++ b/project_template/tests/rap_modules/test_rap_modules.py.njk @@ -0,0 +1,41 @@ +import unittest +from pathlib import Path +from ${{ values.package_name }}.ingestion import load_table +from ${{ values.package_name }}.estimation import aggregate +from ${{ values.package_name }}.disclosure import primary_suppression, rounding + +class TestRAPModulesWorkflow(unittest.TestCase): + def setUp(self): + self.example_path = Path(__file__).parent.parent.parent / "example_data.csv" + self.df = load_table(self.example_path) + self.agg = aggregate(self.df, ["region", "category"], "quantity") + self.suppressed = primary_suppression(self.agg, threshold=2) + self.rounded = rounding(self.suppressed, base=5) + + def test_ingestion(self): + self.assertIsNotNone(self.df) + self.assertGreater(len(self.df), 0) + + def test_estimation(self): + self.assertIsNotNone(self.agg) + self.assertIn("total", self.agg.columns) + self.assertGreater(len(self.agg), 0) + + def test_disclosure(self): + self.assertIn("suppressed", self.suppressed.columns) + self.assertIn("total", self.rounded.columns) + # Check rounding for non-suppressed rows + for t, s in zip(self.rounded["total"], self.rounded["suppressed"]): + if not s: + self.assertEqual(t % 5, 0) + + def test_rap_workflow(self): + df = load_table(self.example_path) + agg = aggregate(df, ["region", "category"], "quantity") + suppressed = primary_suppression(agg, threshold=2) + rounded = rounding(suppressed, base=5) + self.assertGreater(len(rounded), 0) + self.assertIn("total", rounded.columns) + +if __name__ == "__main__": + unittest.main() diff --git a/project_template/tests/unit/test_disclosure.py.njk b/project_template/tests/unit/test_disclosure.py.njk new file mode 100644 index 0000000..24b7899 --- /dev/null +++ b/project_template/tests/unit/test_disclosure.py.njk @@ -0,0 +1,26 @@ +import unittest +import pandas as pd +from pathlib import Path +from ${{ values.package_name }}.disclosure import primary_suppression, rounding +from ${{ values.package_name }}.ingestion import load_table +from ${{ values.package_name }}.estimation import aggregate + +def test_suppression_and_rounding_real_data(): + # Load and aggregate example_data.csv + example_path = Path(__file__).parent.parent.parent / "example_data.csv" + df = load_table(example_path) + agg = aggregate(df, ["region", "category"], "quantity") + # Apply primary suppression + sup = primary_suppression(agg, threshold=2) + # Check that suppressed column exists and is boolean + assert "suppressed" in sup.columns + assert sup["suppressed"].dtype == bool or sup["suppressed"].dtype == object + # Apply rounding + pub = rounding(sup, base=5) + # Check that totals are multiples of 5 (for non-suppressed rows) + for t, s in zip(pub["total"], pub["suppressed"]): + if not s: + assert t % 5 == 0 + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/project_template/tests/unit/test_estimation.py.njk b/project_template/tests/unit/test_estimation.py.njk new file mode 100644 index 0000000..90b212c --- /dev/null +++ b/project_template/tests/unit/test_estimation.py.njk @@ -0,0 +1,22 @@ + +import pandas as pd +from pathlib import Path +from ${{ values.package_name }}.estimation import aggregate +from ${{ values.package_name }}.ingestion import load_table + +def test_aggregate_group_quantity(): + # Use the actual example_data.csv file + example_path = Path(__file__).parent.parent.parent / "example_data.csv" + df = load_table(example_path) + out = aggregate(df, ["region"], "quantity") + # Check expected columns + assert "region" in out.columns + assert "n" in out.columns + assert "total" in out.columns + assert "mean" in out.columns + # Check that there are results for each region + regions = set(df["region"].unique()) + assert set(out["region"]) == regions + +if __name__ == "__main__": + unittest.main() diff --git a/project_template/tests/unit/test_ingest.py.njk b/project_template/tests/unit/test_ingest.py.njk new file mode 100644 index 0000000..3fd021b --- /dev/null +++ b/project_template/tests/unit/test_ingest.py.njk @@ -0,0 +1,18 @@ +import unittest + +from pathlib import Path + +from ${{ values.package_name }}.ingestion import load_table + +def test_load_table_example_csv(): + # Use the actual example_data.csv file + example_path = Path(__file__).parent.parent.parent / "example_data.csv" + df = load_table(example_path) + # Check expected columns from example_data.csv + expected_columns = {"product_id", "product_name", "category", "quantity", "price", "date", "customer_id", "region", "sales_rep"} + assert set(df.columns) == expected_columns + # Check that the file is not empty + assert len(df) > 0 + +if __name__ == "__main__": + unittest.main()