Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions project_template/${{values.package_name}}/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations
import argparse
import logging
from .pipeline import PipelineConfig, run

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def app():
parser = argparse.ArgumentParser(prog="{{ package_name }}", description="RAP sample pipeline")
parser.add_argument("command", choices=["run"], help="Command to run")
parser.add_argument("-c", "--config", default="configs/pipeline.yaml", help="Path to config")
args = parser.parse_args()

logger.info(f"Starting pipeline with command: {args.command}")
logger.info(f"Using config file: {args.config}")

if args.command == "run":
cfg = PipelineConfig.from_yaml(args.config)
logger.debug(f"Loaded config: {cfg}")
_, _, _ = run(cfg)
logger.info("Pipeline complete.")

__all__ = [
"PipelineConfig",
"run",
]
34 changes: 34 additions & 0 deletions project_template/${{values.package_name}}/disclosure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations
import math
import logging
import pandas as pd

logger = logging.getLogger(__name__)

def primary_suppression(df: pd.DataFrame, threshold: int) -> pd.DataFrame:
"""Suppress rows with n < threshold by setting sensitive cells to NaN."""
out = df.copy()
mask = out["n"] < threshold
suppressed_count = mask.sum()
logging.info(f"Applying primary suppression: threshold={threshold}, suppressed rows={suppressed_count}")
for col in ("total", "mean"):
if col in out.columns:
out.loc[mask, col] = pd.NA
logging.debug(f"Suppressed column '{col}' for {suppressed_count} rows.")
out.loc[mask, "suppressed"] = True
out.loc[~mask, "suppressed"] = False
return out

def rounding(df: pd.DataFrame, base: int) -> pd.DataFrame:
"""Round publishable numeric cells to the nearest 'base'."""
out = df.copy()
logging.info(f"Applying rounding: base={base}")
for col in ("total", "mean"):
if col in out.columns:
before = out[col].copy()
out[col] = out[col].apply(
lambda x: (math.floor((x / base) + 0.5) * base) if pd.notna(x) else x
)
changed = (before != out[col]).sum()
logging.debug(f"Rounded column '{col}' for {changed} rows.")
return out
29 changes: 29 additions & 0 deletions project_template/${{values.package_name}}/estimation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations
import logging
import pandas as pd

logger = logging.getLogger(__name__)

def aggregate(df: pd.DataFrame, group_by: list[str], measure: str) -> pd.DataFrame:
"""
Example 'estimation' step: group and compute count, sum, mean for a measure.
Returns a tidy table with one row per group.
"""
if not set([*group_by, measure]).issubset(df.columns):
missing = set([*group_by, measure]) - set(df.columns)
logger.error(f"Missing columns for aggregation: {missing}")
raise KeyError(f"Missing columns: {missing}")

logger.info(f"Aggregating measure '{measure}' by groups {group_by}")
out = (
df.groupby(group_by, dropna=False)[measure]
.agg(n="count", total="sum", mean="mean")
.reset_index()
)
logger.debug(f"Aggregation result:\n{out}")
# enforce column order
result = out.loc[:, [*group_by, "n", "total", "mean"]]
if isinstance(result, pd.Series):
result = result.to_frame().T
logger.info(f"Aggregated {len(result)} rows.")
return result
28 changes: 28 additions & 0 deletions project_template/${{values.package_name}}/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations
import logging
import pandas as pd
from pathlib import Path

logger = logging.getLogger(__name__)

def load_table(path: str | Path) -> pd.DataFrame:
"""Load CSV or Parquet into a DataFrame with standardised dtypes."""
p = Path(path)
logger.info(f"Loading data from: {p}")
if not p.exists():
logger.error(f"File not found: {p}")
raise FileNotFoundError(p)
if p.suffix.lower() in {".csv"}:
df = pd.read_csv(p)
logger.info(f"Loaded CSV file with shape: {df.shape}")
elif p.suffix.lower() in {".parquet", ".pq"}:
df = pd.read_parquet(p)
logger.info(f"Loaded Parquet file with shape: {df.shape}")
else:
logger.error(f"Unsupported file type: {p.suffix}")
raise ValueError(f"Unsupported file type: {p.suffix}")
# Normalise column names
old_columns = list(df.columns)
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
logger.debug(f"Normalized columns from {old_columns} to {list(df.columns)}")
return df
43 changes: 43 additions & 0 deletions project_template/${{values.package_name}}/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import yaml
import pandas as pd

from .ingestion import load_table
from .estimation import aggregate
from .disclosure import primary_suppression, rounding

@dataclass(frozen=True)
class PipelineConfig:
input_path: str
output_dir: str
group_by: list[str]
measure: str
disclosure: dict

@staticmethod
def from_yaml(path: str | Path) -> "PipelineConfig":
data = yaml.safe_load(Path(path).read_text())
return PipelineConfig(**data)

def run(config: PipelineConfig) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
# Ingest
raw = load_table(config.input_path)

# Estimate
agg = aggregate(raw, config.group_by, config.measure)

# Disclosure control
th = int(config.disclosure.get("primary_suppression_threshold", 2))
base = int(config.disclosure.get("rounding_base", 5))
protected = primary_suppression(agg, th)
published = rounding(protected, base)

# Persist
out_dir = Path(config.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
agg.to_csv(out_dir / "aggregates.csv", index=False)
published.to_csv(out_dir / "published.csv", index=False)

return raw, agg, published
8 changes: 8 additions & 0 deletions project_template/configs/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Minimal, declarative knobs for the sample flow
input_path: "../example_data.csv"
output_dir: "data/output"
group_by: ['region', 'category']
measure: "value"
disclosure:
primary_suppression_threshold: 2 # suppress groups with < 5 rows
rounding_base: 5 # round published cells to nearest 5
46 changes: 46 additions & 0 deletions project_template/run_rap_module.py.njk
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Demonstration script for the RAP pipeline."""

import logging
from pathlib import Path

from ${{values.package_name}} import (
disclosure,
estimation,
ingestion,
pipeline,
)

def main():
# Define file paths
source_file = "example_data.csv"
output_file = "outputs/processed_data.csv"

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info("Step 1: Ingestion - Loading data...")
df = ingestion.load_table(source_file)
logger.info(f"Loaded data with shape: {df.shape}")

logger.info("Step 2: Estimation - Aggregating data...")
group_by = ["region", "category"]
measure = "quantity"
agg = estimation.aggregate(df, group_by=group_by, measure=measure)
logger.info(f"Aggregated data:\n{agg}")

logger.info("Step 3: Disclosure - Applying primary suppression and rounding...")
threshold = 2
base = 5
suppressed = disclosure.primary_suppression(agg, threshold=threshold)
rounded = disclosure.rounding(suppressed, base=base)
logger.info(f"Disclosure controlled results:\n{rounded}")

# Optionally, save the output
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
rounded.to_csv(output_path, index=False)
logger.info(f"Final output saved to: {output_file}")

if __name__ == "__main__":
main()
Empty file.
41 changes: 41 additions & 0 deletions project_template/tests/rap_modules/test_rap_modules.py.njk
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import unittest
from pathlib import Path
from ${{ values.package_name }}.ingestion import load_table
from ${{ values.package_name }}.estimation import aggregate
from ${{ values.package_name }}.disclosure import primary_suppression, rounding

class TestRAPModulesWorkflow(unittest.TestCase):
def setUp(self):
self.example_path = Path(__file__).parent.parent.parent / "example_data.csv"
self.df = load_table(self.example_path)
self.agg = aggregate(self.df, ["region", "category"], "quantity")
self.suppressed = primary_suppression(self.agg, threshold=2)
self.rounded = rounding(self.suppressed, base=5)

def test_ingestion(self):
self.assertIsNotNone(self.df)
self.assertGreater(len(self.df), 0)

def test_estimation(self):
self.assertIsNotNone(self.agg)
self.assertIn("total", self.agg.columns)
self.assertGreater(len(self.agg), 0)

def test_disclosure(self):
self.assertIn("suppressed", self.suppressed.columns)
self.assertIn("total", self.rounded.columns)
# Check rounding for non-suppressed rows
for t, s in zip(self.rounded["total"], self.rounded["suppressed"]):
if not s:
self.assertEqual(t % 5, 0)

def test_rap_workflow(self):
df = load_table(self.example_path)
agg = aggregate(df, ["region", "category"], "quantity")
suppressed = primary_suppression(agg, threshold=2)
rounded = rounding(suppressed, base=5)
self.assertGreater(len(rounded), 0)
self.assertIn("total", rounded.columns)

if __name__ == "__main__":
unittest.main()
26 changes: 26 additions & 0 deletions project_template/tests/unit/test_disclosure.py.njk
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import unittest
import pandas as pd
from pathlib import Path
from ${{ values.package_name }}.disclosure import primary_suppression, rounding
from ${{ values.package_name }}.ingestion import load_table
from ${{ values.package_name }}.estimation import aggregate

def test_suppression_and_rounding_real_data():
# Load and aggregate example_data.csv
example_path = Path(__file__).parent.parent.parent / "example_data.csv"
df = load_table(example_path)
agg = aggregate(df, ["region", "category"], "quantity")
# Apply primary suppression
sup = primary_suppression(agg, threshold=2)
# Check that suppressed column exists and is boolean
assert "suppressed" in sup.columns
assert sup["suppressed"].dtype == bool or sup["suppressed"].dtype == object
# Apply rounding
pub = rounding(sup, base=5)
# Check that totals are multiples of 5 (for non-suppressed rows)
for t, s in zip(pub["total"], pub["suppressed"]):
if not s:
assert t % 5 == 0

if __name__ == "__main__":
unittest.main()
22 changes: 22 additions & 0 deletions project_template/tests/unit/test_estimation.py.njk
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

import pandas as pd
from pathlib import Path
from ${{ values.package_name }}.estimation import aggregate
from ${{ values.package_name }}.ingestion import load_table

def test_aggregate_group_quantity():
# Use the actual example_data.csv file
example_path = Path(__file__).parent.parent.parent / "example_data.csv"
df = load_table(example_path)
out = aggregate(df, ["region"], "quantity")
# Check expected columns
assert "region" in out.columns
assert "n" in out.columns
assert "total" in out.columns
assert "mean" in out.columns
# Check that there are results for each region
regions = set(df["region"].unique())
assert set(out["region"]) == regions

if __name__ == "__main__":
unittest.main()
18 changes: 18 additions & 0 deletions project_template/tests/unit/test_ingest.py.njk
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import unittest

from pathlib import Path

from ${{ values.package_name }}.ingestion import load_table

def test_load_table_example_csv():
# Use the actual example_data.csv file
example_path = Path(__file__).parent.parent.parent / "example_data.csv"
df = load_table(example_path)
# Check expected columns from example_data.csv
expected_columns = {"product_id", "product_name", "category", "quantity", "price", "date", "customer_id", "region", "sales_rep"}
assert set(df.columns) == expected_columns
# Check that the file is not empty
assert len(df) > 0

if __name__ == "__main__":
unittest.main()
Loading