-
Notifications
You must be signed in to change notification settings - Fork 18
AE-956 [ads] Collect DAP results and add to BigQuery table #474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AE-956 [ads] Collect DAP results and add to BigQuery table #474
Conversation
…ribution-tasks-into-bq' into AE-956-etl-read-results-from-attribution-tasks-into-bq
mashalifshin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code looks great! I like how you chose to name the files. Tomorrow I will grab the example config from your MARS PR, put some data in DAP, and give it some manual testing.
My main question after reading the code is wondering how we could DRY up some of the shared code between ads-attribution-dap-collector and ads-incrementality-dap-collector. Perhaps for a follow up PR.
| LOG_FILE_NAME = f"{datetime.now()}-ads-newtab-attribution-dap-collector.log" | ||
|
|
||
|
|
||
| def write_job_logs_to_bucket(gcp_project: str, config_bucket: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this function could go into the persist.py file so that main.py only contains the main function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
persist.py was intended to isolate the BQ code which is why the write_job_logs_to_bucket wasn't included there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh okay, that makes sense, I was thinking "persist" could mean more generally "persist logs to a file"...I wonder if there is some other place this could go, for easier readability of the main file.
jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py
Outdated
Show resolved
Hide resolved
The code bases for each job are separate (each job has its own GHA, docker image) making DRY probably more work than it is worth at this point, but something we could look at in the future. |
scholtzan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good from a DE perspective. The job seems to just write to ads_dap_derived.newtab_attribution_v1
| ) | ||
| self.assertEqual(batch_start, date(2026, 1, 15)) | ||
|
|
||
| def test_current_batch_end_(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trailing _ on test name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, updated.
| Note: ads keys are dynamic (source:id), so they remain a dict[str, AdModel]. | ||
| """ | ||
|
|
||
| model_config = ConfigDict(extra="forbid") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i interpret this to mean if the json config has any extra fields then that would be an error. what would the process be for adjusting the shape of the config file / any reason to use forbid over the default ignore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason, forbid is too tight here and would lead to needing to update this job when other consumers of the file require changes to the config, updated.
| try: | ||
| ad_id = int(ad_id_str) | ||
| except ValueError: | ||
| raise ValueError( | ||
| f"Skipping invalid ad key '{ad_key}': ad_id '{ad_id_str}' is not an integer" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the assumption that ad_id is an integer is tough because ad_id is an externally defined id space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code and table to use a string instead.
|
|
||
| def current_batch_end(batch_start: date, duration: int) -> date: | ||
| # since the start and end dates are inclusive need to subtract 1 from duration | ||
| return batch_start + timedelta(seconds=duration, days=-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the days=-1 make an assumption that collector duration is a multiple of seconds in a day? I didn't notice any check that would enforce that, so wondered if it was lurking here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Advertiser definition is schema.py requires the value to be
collector_duration: int = Field(gt=86399) # 1 day - 1 sec in seconds
to guard against duration being specified in hours or days.
mashalifshin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glenda, I'm working through testing locally/in my dev sandbox.
Verified
- Fails gracefully with good error when config is missing
- Fails gracefully with good error when config is invalid
Now I'm trying to test the happy path, but I can't get it to collect results when I'd expect.
For a batch start of 1-15-2026, and a batch duration of 604800 (7 days), shouldn't it collect on process_date 1-22-2026? I've tried a couple days around that (1-23, 1-24, 1-25), and it always says
No results available for advertiser: nature with start_date: 2026-01-15 and process_date: 2026-01-22
I double checked that my task ID that I created for this experiment is in the partners list for the nature advertiser's partner id ... am I missing something, or could it be a bug?
| LOG_FILE_NAME = f"{datetime.now()}-ads-newtab-attribution-dap-collector.log" | ||
|
|
||
|
|
||
| def write_job_logs_to_bucket(gcp_project: str, config_bucket: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh okay, that makes sense, I was thinking "persist" could mean more generally "persist logs to a file"...I wonder if there is some other place this could go, for easier readability of the main file.
Yeah when we have time maybe we can extract a shared python lib for the GCP stuff, and another for the DAP stuff. And also clean up some of the older jobs that (I think) this one will take precedent over. |
|
@gleonard-m one more thought, would you mind adding an |
Added an example to the README.md. |
mashalifshin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gleonard-m thanks for your help troubleshooting why I couldn't collect, I was indeed missing something: The date to collect for a 7 day batch that starts 1-15-2026 is actually 1-21-2026, forgot about the adjustment airflow makes to pass the previous date when it runs the job.
So it works like a charm! Verified
- Advertisers with start dates in the future gets gracefully skipped
- Can successfully collect with the right process date, config file, and DAP setup
Yep, added an example to the README.md |
AE-956
This docker-etl job reads configuration from an ads owned bucket, collects DAP results and inserts into a BigQUery table. If the table does not exist the job creates it.
Checklist for reviewer:
referenced, the pull request should include the bug number in the title)
.circleci/config.yml) will cause environment variables (particularlycredentials) to be exposed in test logs
telemetry-airflow
responsibly.
Note for deployments: In order to push images built by this PR, the user who merges the PR
must be in the telemetry Github team.
This is because deploys depend on the
data-eng-airflow-gcr CircleCI context.
See DENG-8850 for additional discussion.