Skip to content

Conversation

@gleonard-m
Copy link
Contributor

@gleonard-m gleonard-m commented Jan 22, 2026

AE-956

This docker-etl job reads configuration from an ads owned bucket, collects DAP results and inserts into a BigQUery table. If the table does not exist the job creates it.

Checklist for reviewer:

  • Commits should reference a bug or github issue, if relevant (if a bug is
    referenced, the pull request should include the bug number in the title)
  • Scan the PR and verify that no changes (particularly to
    .circleci/config.yml) will cause environment variables (particularly
    credentials) to be exposed in test logs
  • Ensure the container image will be using permissions granted to
    telemetry-airflow
    responsibly.

Note for deployments: In order to push images built by this PR, the user who merges the PR
must be in the telemetry Github team.
This is because deploys depend on the
data-eng-airflow-gcr CircleCI context.
See DENG-8850 for additional discussion.

@gleonard-m gleonard-m requested a review from a team as a code owner January 22, 2026 18:58
Copy link
Contributor

@mashalifshin mashalifshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks great! I like how you chose to name the files. Tomorrow I will grab the example config from your MARS PR, put some data in DAP, and give it some manual testing.

My main question after reading the code is wondering how we could DRY up some of the shared code between ads-attribution-dap-collector and ads-incrementality-dap-collector. Perhaps for a follow up PR.

LOG_FILE_NAME = f"{datetime.now()}-ads-newtab-attribution-dap-collector.log"


def write_job_logs_to_bucket(gcp_project: str, config_bucket: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this function could go into the persist.py file so that main.py only contains the main function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persist.py was intended to isolate the BQ code which is why the write_job_logs_to_bucket wasn't included there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh okay, that makes sense, I was thinking "persist" could mean more generally "persist logs to a file"...I wonder if there is some other place this could go, for easier readability of the main file.

@gleonard-m
Copy link
Contributor Author

This code looks great! I like how you chose to name the files. Tomorrow I will grab the example config from your MARS PR, put some data in DAP, and give it some manual testing.

My main question after reading the code is wondering how we could DRY up some of the shared code between ads-attribution-dap-collector and ads-incrementality-dap-collector. Perhaps for a follow up PR.

The code bases for each job are separate (each job has its own GHA, docker image) making DRY probably more work than it is worth at this point, but something we could look at in the future.

Copy link
Contributor

@scholtzan scholtzan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good from a DE perspective. The job seems to just write to ads_dap_derived.newtab_attribution_v1

)
self.assertEqual(batch_start, date(2026, 1, 15))

def test_current_batch_end_(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trailing _ on test name?

Copy link
Contributor Author

@gleonard-m gleonard-m Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, updated.

Note: ads keys are dynamic (source:id), so they remain a dict[str, AdModel].
"""

model_config = ConfigDict(extra="forbid")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i interpret this to mean if the json config has any extra fields then that would be an error. what would the process be for adjusting the shape of the config file / any reason to use forbid over the default ignore?

Copy link
Contributor Author

@gleonard-m gleonard-m Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, forbid is too tight here and would lead to needing to update this job when other consumers of the file require changes to the config, updated.

Comment on lines 71 to 76
try:
ad_id = int(ad_id_str)
except ValueError:
raise ValueError(
f"Skipping invalid ad key '{ad_key}': ad_id '{ad_id_str}' is not an integer"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the assumption that ad_id is an integer is tough because ad_id is an externally defined id space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code and table to use a string instead.


def current_batch_end(batch_start: date, duration: int) -> date:
# since the start and end dates are inclusive need to subtract 1 from duration
return batch_start + timedelta(seconds=duration, days=-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the days=-1 make an assumption that collector duration is a multiple of seconds in a day? I didn't notice any check that would enforce that, so wondered if it was lurking here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Advertiser definition is schema.py requires the value to be
collector_duration: int = Field(gt=86399) # 1 day - 1 sec in seconds
to guard against duration being specified in hours or days.

Copy link
Contributor

@mashalifshin mashalifshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glenda, I'm working through testing locally/in my dev sandbox.
Verified

  • Fails gracefully with good error when config is missing
  • Fails gracefully with good error when config is invalid

Now I'm trying to test the happy path, but I can't get it to collect results when I'd expect.
For a batch start of 1-15-2026, and a batch duration of 604800 (7 days), shouldn't it collect on process_date 1-22-2026? I've tried a couple days around that (1-23, 1-24, 1-25), and it always says

No results available for advertiser: nature with start_date: 2026-01-15 and process_date: 2026-01-22

I double checked that my task ID that I created for this experiment is in the partners list for the nature advertiser's partner id ... am I missing something, or could it be a bug?

LOG_FILE_NAME = f"{datetime.now()}-ads-newtab-attribution-dap-collector.log"


def write_job_logs_to_bucket(gcp_project: str, config_bucket: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh okay, that makes sense, I was thinking "persist" could mean more generally "persist logs to a file"...I wonder if there is some other place this could go, for easier readability of the main file.

@mashalifshin
Copy link
Contributor

The code bases for each job are separate (each job has its own GHA, docker image) making DRY probably more work than it is worth at this point, but something we could look at in the future.

Yeah when we have time maybe we can extract a shared python lib for the GCP stuff, and another for the DAP stuff. And also clean up some of the older jobs that (I think) this one will take precedent over.

@mashalifshin
Copy link
Contributor

@gleonard-m one more thought, would you mind adding an example-config.json file somewhere in the root here, or in the README? Just a convenience for the local dev and for testing (had to grab this from the MARS PR).

@gleonard-m
Copy link
Contributor Author

@gleonard-m one more thought, would you mind adding an example-config.json file somewhere in the root here, or in the README? Just a convenience for the local dev and for testing (had to grab this from the MARS PR).

Added an example to the README.md.

Copy link
Contributor

@mashalifshin mashalifshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gleonard-m thanks for your help troubleshooting why I couldn't collect, I was indeed missing something: The date to collect for a 7 day batch that starts 1-15-2026 is actually 1-21-2026, forgot about the adjustment airflow makes to pass the previous date when it runs the job.

So it works like a charm! Verified

  • Advertisers with start dates in the future gets gracefully skipped
  • Can successfully collect with the right process date, config file, and DAP setup

@gleonard-m
Copy link
Contributor Author

@gleonard-m one more thought, would you mind adding an example-config.json file somewhere in the root here, or in the README? Just a convenience for the local dev and for testing (had to grab this from the MARS PR).

Yep, added an example to the README.md

@gleonard-m gleonard-m merged commit 8a2fb67 into main Jan 27, 2026
3 checks passed
@gleonard-m gleonard-m deleted the AE-956-etl-read-results-from-attribution-tasks-into-bq branch January 27, 2026 17:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants