-
Notifications
You must be signed in to change notification settings - Fork 18
AE-956 [ads] Collect DAP results and add to BigQuery table #474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
gleonard-m
merged 14 commits into
main
from
AE-956-etl-read-results-from-attribution-tasks-into-bq
Jan 27, 2026
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
2139155
Initial version of ads-attribution-dap-collector job
gleonard-m 2c34e49
Initial version of ads-attribution-dap-collector job
gleonard-m 42e9be7
Downgraded pytest version
gleonard-m 64160ab
Downgraded python version
gleonard-m 1f19211
Merge branch 'main' into AE-956-etl-read-results-from-attribution-tas…
gleonard-m 778df84
Updated pytest version
gleonard-m 3864126
Merge remote-tracking branch 'origin/AE-956-etl-read-results-from-att…
gleonard-m 6f38257
Use python 3.12 and update ci_job.yml
gleonard-m 4c49a27
Update job-ads-attribution-dap-collector.yml
gleonard-m 16fab7c
Renamed schema classes
gleonard-m 45700ed
Trailing _ in test function
gleonard-m 305a6cb
Relaxed schema for extra fields
gleonard-m 714325f
Update README.md to include a config example
gleonard-m 2b0a4d6
updated ad_id type from int to string
gleonard-m File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| ### | ||
| # This file was generated by docker-etl/ci_config.py. | ||
| # Changes should be made to job ci_job.yaml files and re-generated. | ||
| ### | ||
|
|
||
| name: ads-attribution-dap-collector | ||
|
|
||
| on: | ||
| push: | ||
| branches: | ||
| - main | ||
| paths: | ||
| - 'jobs/ads-attribution-dap-collector/**' | ||
| - '.github/workflows/job-ads-attribution-dap-collector.yml' | ||
| pull_request: | ||
| paths: | ||
| - 'jobs/ads-attribution-dap-collector/**' | ||
| - '.github/workflows/job-ads-attribution-dap-collector.yml' | ||
|
|
||
| jobs: | ||
| build-job-ads-attribution-dap-collector: | ||
| runs-on: ubuntu-latest | ||
| steps: | ||
| - name: Checkout code | ||
| uses: actions/checkout@v6 | ||
|
|
||
| - name: Build the Docker image | ||
| # yamllint disable | ||
| run: | | ||
| docker build jobs/ads-attribution-dap-collector -t us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest | ||
| # yamllint enable | ||
| - name: Test Code | ||
| run: docker run us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest python3 -m pytest | ||
|
|
||
| push-job-ads-attribution-dap-collector: | ||
| runs-on: ubuntu-latest | ||
| needs: build-job-ads-attribution-dap-collector | ||
| if: github.ref == 'refs/heads/main' | ||
| steps: | ||
| - name: Checkout code | ||
| uses: actions/checkout@v6 | ||
|
|
||
| - name: Authenticate to Google Cloud | ||
| uses: google-github-actions/auth@v2 | ||
| with: | ||
| credentials_json: ${{ secrets.GCP_CREDENTIALS }} | ||
|
|
||
| - name: Set up Cloud SDK | ||
| uses: google-github-actions/setup-gcloud@v2 | ||
|
|
||
| - name: Configure Docker for GCR | ||
| run: gcloud auth configure-docker | ||
|
|
||
| - name: Build Docker image | ||
| run: docker build jobs/ads-attribution-dap-collector/ -t gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest | ||
|
|
||
| - name: Push to GCR | ||
| run: docker push gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| .ci_job.yaml | ||
| .ci_workflow.yaml | ||
| .DS_Store | ||
| *.pyc | ||
| .pytest_cache/ | ||
| __pycache__/ | ||
| venv/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| [flake8] | ||
| max-line-length = 88 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| .DS_Store | ||
| *.pyc | ||
| __pycache__/ | ||
| venv/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| FROM python:3.12 | ||
| LABEL maintainer="Glenda Leonard <gleonard@mozilla.com>" | ||
| ARG HOME="/janus_build" | ||
| WORKDIR ${HOME} | ||
|
|
||
| RUN apt update && apt --yes install curl | ||
|
|
||
| RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y | ||
| ENV PATH=$HOME/.cargo/bin:$PATH | ||
|
|
||
| # build the CLI tool | ||
| RUN git clone --depth 1 https://github.com/divviup/janus.git --branch '0.7.69' | ||
| RUN cd janus && cargo build -r -p janus_tools --bin collect | ||
|
|
||
| ######### next stage | ||
|
|
||
| FROM python:3.12 | ||
| LABEL maintainer="Glenda Leonard <gleonard@mozilla.com>" | ||
| # https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md | ||
| ARG USER_ID="10001" | ||
| ARG GROUP_ID="app" | ||
| ARG HOME="/app" | ||
| ENV HOME=${HOME} | ||
|
|
||
| RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \ | ||
| useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID} | ||
|
|
||
| WORKDIR ${HOME} | ||
| COPY --from=0 /janus_build/janus/target/release/collect ./ | ||
|
|
||
| RUN pip install --upgrade pip | ||
|
|
||
| COPY requirements.txt requirements.txt | ||
| RUN pip install -r requirements.txt | ||
|
|
||
| COPY . . | ||
|
|
||
| RUN pip install . | ||
|
|
||
| # Drop root and change ownership of the application folder to the user | ||
| RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME} | ||
| USER ${USER_ID} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| # Ads Attribution DAP Collection Job | ||
|
|
||
| This job collects metrics from DAP and write the results to BigQuery. | ||
|
|
||
| ## Overview | ||
| This job is driven by a config file from a GCS bucket. Use `job_config_gcp_project` | ||
| and `job_config_bucket` to specify the file. The config file must be named | ||
| `attribution-conf.json` and a sample is available [here](https://github.com/mozilla-services/mars/tree/main/internal/gcp/storage/testdata/mars-attribution-config). | ||
|
|
||
| ## Usage | ||
|
|
||
| This script is intended to be run in a docker container. | ||
| Build the docker image with: | ||
|
|
||
| It requires setup of some environment variables that hold DAP credentials, and the job will look for those when it | ||
| starts up. A dev script, `dev_run_docker.sh`, is included for convenience to build and run the job locally, and it | ||
| also documents those variables. | ||
|
|
||
| Once the environment variables are set up, run the job with: | ||
|
|
||
|
|
||
| ```sh | ||
| ./dev_run_docker.sh | ||
| ``` | ||
| To just build the docker image, use: | ||
| ``` | ||
| docker build -t ads-attribution-dap-collector . | ||
| ``` | ||
|
|
||
| Sample attribution-conf.json file | ||
| ```shell | ||
| { | ||
| "collection_config": { | ||
| "hpke_config": "hpke-config" | ||
| }, | ||
| "advertisers": [ | ||
| { | ||
| "name": "mozilla", | ||
| "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa1234", | ||
| "start_date": "2026-01-08", | ||
| "collector_duration": 604800, | ||
| "conversion_type": "view", | ||
| "lookback_window": 7 | ||
| } | ||
| ], | ||
| "partners": { | ||
| "295beef7-1e3b-4128-b8f8-858e12aa1234": { | ||
| "task_id": "<task_id>", | ||
| "vdaf": "histogram", | ||
| "bits": 0, | ||
| "length": 40, | ||
| "time_precision": 60, | ||
| "default_measurement": 0 | ||
| } | ||
| }, | ||
| "ads": { | ||
| "provider:1234": { | ||
| "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa1234", | ||
| "index": 1 | ||
| } | ||
| } | ||
| } | ||
|
|
||
| ``` | ||
|
|
||
| ## Testing | ||
|
|
||
| First create the job venv using | ||
| ``` | ||
| python -m venv ./venv | ||
| source ./venv/bin/activat | ||
| pip install -r requirements.txt | ||
| ``` | ||
| Run tests from `/jobs/ads-attribution-dap-collector` using: | ||
| `python -m pytest` | ||
|
|
||
| ## Linting and Formatting | ||
| ``` | ||
| black . | ||
| ``` | ||
| ``` | ||
| flake8 . | ||
| ``` |
Empty file.
190 changes: 190 additions & 0 deletions
190
jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/collect.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,190 @@ | ||
| import ast | ||
| import logging | ||
| import subprocess | ||
| import re | ||
|
|
||
| from datetime import date, datetime, timedelta | ||
|
|
||
| DAP_LEADER = "https://dap-09-3.api.divviup.org" | ||
| VDAF = "histogram" | ||
| PROCESS_TIMEOUT = 1200 # 20 mins | ||
|
|
||
|
|
||
| def get_aggregated_results( | ||
| process_date: date, | ||
| batch_start: date, | ||
| batch_end: date, | ||
| task_id: str, | ||
| vdaf_length: int, | ||
| collector_duration: int, | ||
| bearer_token: str, | ||
| hpke_config: str, | ||
| hpke_private_key: str, | ||
| ) -> dict: | ||
| process_batch = _should_collect_batch(process_date, batch_end) | ||
|
|
||
| if process_batch: | ||
| # Step 4 Collect DAP results. | ||
| aggregated_results = collect_dap_result( | ||
| task_id=task_id, | ||
| vdaf_length=vdaf_length, | ||
| batch_start=batch_start, | ||
| duration=collector_duration, | ||
| bearer_token=bearer_token, | ||
| hpke_config=hpke_config, | ||
| hpke_private_key=hpke_private_key, | ||
| ) | ||
|
|
||
| return aggregated_results | ||
|
|
||
|
|
||
| def current_batch_start( | ||
| process_date: date, partner_start_date: date, duration: int | ||
| ) -> date | None: | ||
| if process_date < partner_start_date: | ||
| return None | ||
|
|
||
| if ( | ||
| partner_start_date | ||
| <= process_date | ||
| < partner_start_date + timedelta(seconds=duration) | ||
| ): | ||
| return partner_start_date | ||
|
|
||
| # After the first interval ... | ||
| batch_start = partner_start_date | ||
| while True: | ||
| next_start = batch_start + timedelta(seconds=duration) | ||
| # check if the process_date is the batch_end date | ||
| # if yes we only need to go back 1 duration to get the start | ||
| if next_start + timedelta(days=-1) == process_date: | ||
| return next_start + timedelta(seconds=-duration) | ||
|
|
||
| # this means the process date is in the next interval so | ||
| # need to go back 2 durations to get the batch_start | ||
| if next_start > process_date: | ||
| return next_start + timedelta(seconds=-2 * duration) | ||
|
|
||
| batch_start = next_start | ||
|
|
||
|
|
||
| def current_batch_end(batch_start: date, duration: int) -> date: | ||
| # since the start and end dates are inclusive need to subtract 1 from duration | ||
| return batch_start + timedelta(seconds=duration, days=-1) | ||
|
|
||
|
|
||
| def _should_collect_batch(process_date, batch_end) -> bool: | ||
| return batch_end == process_date | ||
|
|
||
|
|
||
| def _correct_wraparound(num: int) -> int: | ||
| field_prime = 340282366920938462946865773367900766209 | ||
| field_size = 128 | ||
| cutoff = 2 ** (field_size - 1) | ||
| if num > cutoff: | ||
| return num - field_prime | ||
| return num | ||
|
|
||
|
|
||
| def _parse_histogram(histogram_str: str) -> dict: | ||
| parsed_list = ast.literal_eval(histogram_str) | ||
| return {i: _correct_wraparound(val) for i, val in enumerate(parsed_list)} | ||
|
|
||
|
|
||
| def _parse_http_error(text: str) -> tuple[int, str, str | None] | None: | ||
| """ | ||
| Returns (status_code, status_text, error_message) | ||
| or None if the pattern is not found. | ||
| """ | ||
| ERROR_RE = re.compile( | ||
| r"HTTP response status\s+(\d+)\s+([A-Za-z ]+)(?:\s+-\s+(.*))?$" | ||
| ) | ||
| match = ERROR_RE.search(text) | ||
| if not match: | ||
| return None | ||
|
|
||
| status_code = int(match.group(1)) | ||
| status_text = match.group(2).strip() | ||
| error_message = match.group(3).strip() if match.group(3) else None | ||
| return status_code, status_text, error_message | ||
|
|
||
|
|
||
| # DAP functions | ||
| def collect_dap_result( | ||
| task_id: str, | ||
| vdaf_length: int, | ||
| batch_start: date, | ||
| duration: int, | ||
| bearer_token: str, | ||
| hpke_config: str, | ||
| hpke_private_key: str, | ||
| ) -> dict: | ||
| # Beware! This command string reveals secrets. Use logging only for | ||
| # debugging in local dev. | ||
|
|
||
| batch_start_epoch = int( | ||
| datetime.combine(batch_start, datetime.min.time()).timestamp() | ||
| ) | ||
|
|
||
| try: | ||
| result = subprocess.run( | ||
| [ | ||
| "./collect", | ||
| "--task-id", | ||
| task_id, | ||
| "--leader", | ||
| DAP_LEADER, | ||
| "--vdaf", | ||
| VDAF, | ||
| "--length", | ||
| f"{vdaf_length}", | ||
| "--authorization-bearer-token", | ||
| bearer_token, | ||
| "--batch-interval-start", | ||
| f"{batch_start_epoch}", | ||
| "--batch-interval-duration", | ||
| f"{duration}", | ||
| "--hpke-config", | ||
| hpke_config, | ||
| "--hpke-private-key", | ||
| hpke_private_key, | ||
| ], | ||
| capture_output=True, | ||
| text=True, | ||
| check=True, | ||
| timeout=PROCESS_TIMEOUT, | ||
| ) | ||
| for line in result.stdout.splitlines(): | ||
| if line.startswith("Aggregation result:"): | ||
| entries = _parse_histogram(line[21:-1]) | ||
| return entries | ||
| # Beware! Exceptions thrown by the subprocess reveal secrets. | ||
| # Log them and include traceback only for debugging in local dev. | ||
| except subprocess.CalledProcessError as e: | ||
| result = _parse_http_error(e.stderr) | ||
| if result is None: | ||
| logging.error(e) | ||
| raise Exception( | ||
| f"Collection failed for {task_id}, {e.returncode}, stderr: {e.stderr}" | ||
| ) from None | ||
| else: | ||
| status_code, status_text, error_message = result | ||
| if status_code == 400: | ||
| logging.info( | ||
| f"Collection failed for {task_id}, {status_code} {status_text}" | ||
| f" {error_message}" | ||
| ) | ||
| elif status_code == 404: | ||
| detail = ( | ||
| error_message | ||
| if error_message is not None | ||
| else "Verify start date is not more than 14 days ago." | ||
| ) | ||
| logging.info( | ||
| f"Collection failed for {task_id}, {status_code} {status_text} " | ||
| f"{detail}" | ||
| ) | ||
| except subprocess.TimeoutExpired as e: | ||
| raise Exception( | ||
| f"Collection timed out for {task_id}, {e.timeout}, stderr: {e.stderr}" | ||
| ) from None | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the
days=-1make an assumption that collector duration is a multiple of seconds in a day? I didn't notice any check that would enforce that, so wondered if it was lurking here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Advertiser definition is schema.py requires the value to be
collector_duration: int = Field(gt=86399) # 1 day - 1 sec in secondsto guard against duration being specified in hours or days.