diff --git a/README.md b/README.md index 3d60ba20..1f20ba8c 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ fct_dbt__model_executions fct_dbt__seed_executions fct_dbt__snapshot_executions fct_dbt__test_executions +fct_dbt__source_executions ``` See the generated [dbt docs site](https://brooklyn-data.github.io/dbt_artifacts/#!/overview) for documentation on each model. @@ -75,6 +76,13 @@ packages: dbt run --select dbt_artifacts ``` +5. If you want to include results from the `dbt source freshness` command, you'll need to add the following flag in your `dbt_project.yml`. This was recently added and made backwards-compatible to older core versions, so it's something that you have to opt into. + + ```yml + flags: + source_freshness_run_project_hooks: true + ``` + ### Notes on upgrading Due to the structure of the project, when additional fields are added, the package needs to be re-run to ensure the tables include the new field, or it will simply error on the hook. These changes will always be implemented within a new **minor** version, so make sure that the version you use in `packages.yml` reflects this. diff --git a/integration_test_project/dbt_project.yml b/integration_test_project/dbt_project.yml index aae6f8df..3e244833 100644 --- a/integration_test_project/dbt_project.yml +++ b/integration_test_project/dbt_project.yml @@ -30,5 +30,10 @@ models: seeds: +quote_columns: false +flags: + source_freshness_run_project_hooks: true + send_anonymous_usage_stats: false + use_colors: true + on-run-end: - "{{ dbt_artifacts.upload_results(results) }}" diff --git a/integration_test_project/models/sources.yml b/integration_test_project/models/sources.yml index 4d7c2944..9454f635 100644 --- a/integration_test_project/models/sources.yml +++ b/integration_test_project/models/sources.yml @@ -4,10 +4,6 @@ sources: - name: dummy_source database: "{% if target.type not in ('spark', 'databricks') %}{{ var('dbt_artifacts_database', target.database) }}{% endif %}" schema: "{{ target.schema }}" - freshness: - error_after: {count: 24, period: hour} - filter: dayname(updatedat) not in ('Sunday', 'Monday') - loaded_at_field: convert_timezone('UTC', load_timestamp) tables: - name: doesnt_exist - name: '"GROUP"' @@ -16,7 +12,8 @@ sources: database: "{% if target.type not in ('spark', 'databricks') %}{{ var('dbt_artifacts_database', target.database) }}{% endif %}" schema: "{{ target.schema }}" freshness: - error_after: {count: 1, period: hour} + error_after: {count: 3, period: day} + warn_after: {count: 1, period: day} loaded_at_field: load_timestamp tables: - name: freshness diff --git a/integration_test_project/profiles.yml b/integration_test_project/profiles.yml index 84d85ddc..ab3b3986 100644 --- a/integration_test_project/profiles.yml +++ b/integration_test_project/profiles.yml @@ -1,10 +1,6 @@ # HEY! This file is used in the dbt-artifacts integrations tests with GitHub Actions. # You should __NEVER__ check credentials into version control. Thanks for reading :) -config: - send_anonymous_usage_stats: False - use_colors: True - dbt_artifacts: target: snowflake outputs: diff --git a/integration_test_project/seeds/freshness.csv b/integration_test_project/seeds/freshness.csv index 9b4f320c..6d52c5a0 100644 --- a/integration_test_project/seeds/freshness.csv +++ b/integration_test_project/seeds/freshness.csv @@ -3,3 +3,4 @@ id,load_timestamp 102,2025-04-09 12:05:00 103,2025-04-09 12:10:00 104,2025-04-09 12:15:00 +105,2025-05-25 12:30:00 diff --git a/macros/upload_individual_datasets/upload_source_executions.sql b/macros/upload_individual_datasets/upload_source_executions.sql new file mode 100644 index 00000000..791700f0 --- /dev/null +++ b/macros/upload_individual_datasets/upload_source_executions.sql @@ -0,0 +1,209 @@ +{% macro upload_source_executions(sources) %} + {{ return(adapter.dispatch('get_source_executions_dml_sql', 'dbt_artifacts')(sources)) }} +{% endmacro %} + +{% macro default__get_source_executions_dml_sql(sources) -%} + {% if sources != [] %} + + {% set source_execution_values %} + select + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(6) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(7) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(12) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(13) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(14) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(15) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(16) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(17) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(18) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(19) }}, + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(20)) }} + from values + {% for source in sources -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ source.node.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ source.thread_id }}', {# thread_id #} + '{{ source.status }}', {# status #} + + {% set compile_started_at = (source.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (source.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} + + {{ source.execution_time }}, {# total_node_runtime #} + '{{ source.node.schema }}', {# schema #} + '{{ source.node.name }}', {# name #} + '{{ source.node.source_name }}', {# source_name #} + '{{ source.node.loaded_at_field }}', {# loaded_at_field #} + {% if source.node.freshness.warn_after.count is not none %}{{ source.node.freshness.warn_after.count }}{% else %}null{% endif %}, {# warn_after_count #} + {% if source.node.freshness.warn_after.period is not none %}'{{ source.node.freshness.warn_after.period }}'{% else %}null{% endif %}, {# warn_after_period #} + {% if source.node.freshness.error_after.count is not none %}{{ source.node.freshness.error_after.count }}{% else %}null{% endif %}, {# error_after_count #} + {% if source.node.freshness.error_after.period is not none %}'{{ source.node.freshness.error_after.period }}'{% else %}null{% endif %}, {# error_after_period #} + '{{ source.max_loaded_at }}', {# max_loaded_at #} + '{{ source.snapshotted_at }}', {# snapshotted_at #} + {{ source.age }}, {# age #} + '{{ tojson(source.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {% endfor %} + {% endset %} + {{ source_execution_values }} + {% else %} + {{ return ("") }} + {% endif %} +{% endmacro -%} + + +{% macro bigquery__get_source_executions_dml_sql(sources) -%} + {% if sources != [] %} + + {% set source_execution_values %} + {% for source in sources -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ source.node.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ source.thread_id }}', {# thread_id #} + '{{ source.status }}', {# status #} + + {% set compile_started_at = (source.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (source.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} + + {{ source.execution_time }}, {# total_node_runtime #} + '{{ source.node.schema }}', {# schema #} + '{{ source.node.name }}', {# name #} + '{{ source.node.source_name }}', {# source_name #} + '{{ source.node.loaded_at_field }}', {# loaded_at_field #} + '{{ source.node.freshness.warn_after.count }}', {# warn_after_count #} + '{{ source.node.freshness.warn_after.period }}', {# warn_after_period #} + '{{ source.node.freshness.error_after.count }}', {# error_after_count #} + '{{ source.node.freshness.error_after.period }}', {# error_after_period #} + '{{ source.max_loaded_at }}', {# max_loaded_at #} + '{{ source.snapshotted_at }}', {# snapshotted_at #} + {{ source.age }}, {# age #} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(model.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {% endfor %} + {% endset %} + {{ source_execution_values }} + {% else %} + {{ return ("") }} + {% endif %} +{% endmacro -%} + + +{% macro snowflake__get_source_executions_dml_sql(sources) -%} + {% if sources != [] %} + + {% set source_execution_values %} + select + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(1) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(2) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(3) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(4) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(5) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(6) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(7) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(8) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(12) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(13) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(14) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(15) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(16) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(17) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(18) }}, + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(19) }}, + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(20)) }} + from values + {% for source in sources -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ source.node.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ source.thread_id }}', {# thread_id #} + '{{ source.status }}', {# status #} + + {% set compile_started_at = (source.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (source.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} + + {{ source.execution_time }}, {# total_node_runtime #} + '{{ source.node.schema }}', {# schema #} + '{{ source.node.name }}', {# name #} + '{{ source.node.source_name }}', {# source_name #} + '{{ source.node.loaded_at_field }}', {# loaded_at_field #} + {% if source.node.freshness.warn_after.count is not none %}{{ source.node.freshness.warn_after.count }}{% else %}null{% endif %}, {# warn_after_count #} + {% if source.node.freshness.warn_after.period is not none %}'{{ source.node.freshness.warn_after.period }}'{% else %}null{% endif %}, {# warn_after_period #} + {% if source.node.freshness.error_after.count is not none %}{{ source.node.freshness.error_after.count }}{% else %}null{% endif %}, {# error_after_count #} + {% if source.node.freshness.error_after.period is not none %}'{{ source.node.freshness.error_after.period }}'{% else %}null{% endif %}, {# error_after_period #} + '{{ source.max_loaded_at }}', {# max_loaded_at #} + '{{ source.snapshotted_at }}', {# snapshotted_at #} + {{ source.age }}, {# age #} + '{{ tojson(source.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {% endfor %} + {% endset %} + {{ source_execution_values }} + {% else %} + {{ return ("") }} + {% endif %} +{% endmacro -%} + +{% macro postgres__get_source_executions_dml_sql(sources) -%} + {% if sources != [] %} + + {% set source_execution_values %} + {% for source in sources -%} + ( + '{{ invocation_id }}', {# command_invocation_id #} + '{{ source.node.unique_id }}', {# node_id #} + '{{ run_started_at }}', {# run_started_at #} + '{{ source.thread_id }}', {# thread_id #} + '{{ source.status }}', {# status #} + + {% set compile_started_at = (source.timing | selectattr("name", "eq", "compile") | first | default({}))["started_at"] %} + {% if compile_started_at %}'{{ compile_started_at }}'{% else %}null{% endif %}, {# compile_started_at #} + {% set query_completed_at = (source.timing | selectattr("name", "eq", "execute") | first | default({}))["completed_at"] %} + {% if query_completed_at %}'{{ query_completed_at }}'{% else %}null{% endif %}, {# query_completed_at #} + + {{ source.execution_time }}, {# total_node_runtime #} + '{{ source.node.schema }}', {# schema #} + '{{ source.node.name }}', {# name #} + '{{ source.node.source_name }}', {# source_name #} + '{{ source.node.loaded_at_field }}', {# loaded_at_field #} + '{{ source.node.freshness.warn_after.count }}', {# warn_after_count #} + '{{ source.node.freshness.warn_after.period }}', {# warn_after_period #} + '{{ source.node.freshness.error_after.count }}', {# error_after_count #} + '{{ source.node.freshness.error_after.period }}', {# error_after_period #} + '{{ source.max_loaded_at }}', {# max_loaded_at #} + '{{ source.snapshotted_at }}', {# snapshotted_at #} + {{ source.age }}, {# age #} + $${{ tojson(model.adapter_response) }}$$ {# adapter_response #} + ) + {%- if not loop.last %},{%- endif %} + {% endfor %} + {% endset %} + {{ source_execution_values }} + {% else %} + {{ return ("") }} + {% endif %} +{% endmacro -%} diff --git a/macros/upload_results/get_column_name_lists.sql b/macros/upload_results/get_column_name_lists.sql index 707e0c2c..eb38b56e 100644 --- a/macros/upload_results/get_column_name_lists.sql +++ b/macros/upload_results/get_column_name_lists.sql @@ -227,6 +227,31 @@ all_results ) + {% elif dataset == 'source_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + schema, + name, + source_name, + loaded_at_field, + warn_after_count, + warn_after_period, + error_after_count, + error_after_period, + max_loaded_at, + snapshotted_at, + age, + adapter_response + ) + {% else %} /* No column list available */ diff --git a/macros/upload_results/get_dataset_content.sql b/macros/upload_results/get_dataset_content.sql index b34e39cd..e318c69d 100644 --- a/macros/upload_results/get_dataset_content.sql +++ b/macros/upload_results/get_dataset_content.sql @@ -1,6 +1,6 @@ {% macro get_dataset_content(dataset) %} - {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %} + {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions', 'source_executions'] %} {# Executions make use of the results object #} {% set objects = results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list %} {% elif dataset in ['seeds', 'snapshots', 'tests', 'models'] %} diff --git a/macros/upload_results/get_table_content_values.sql b/macros/upload_results/get_table_content_values.sql index 277899c5..6b459341 100644 --- a/macros/upload_results/get_table_content_values.sql +++ b/macros/upload_results/get_table_content_values.sql @@ -10,6 +10,8 @@ {% set content = dbt_artifacts.upload_test_executions(objects_to_upload) %} {% elif dataset == 'snapshot_executions' %} {% set content = dbt_artifacts.upload_snapshot_executions(objects_to_upload) %} + {% elif dataset == 'source_executions' %} + {% set content = dbt_artifacts.upload_source_executions(objects_to_upload) %} {% elif dataset == 'exposures' %} {% set content = dbt_artifacts.upload_exposures(objects_to_upload) %} {% elif dataset == 'models' %} diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index 114a667d..6e06121e 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -7,7 +7,7 @@ {% set datasets_to_load = ['exposures', 'seeds', 'snapshots', 'invocations', 'sources', 'tests', 'models'] %} {% if results != [] %} {# When executing, and results are available, then upload the results #} - {% set datasets_to_load = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] + datasets_to_load %} + {% set datasets_to_load = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions', 'source_executions'] + datasets_to_load %} {% endif %} {# Upload each data set in turn #} diff --git a/models/docs.md b/models/docs.md index 64b420cb..b884e896 100644 --- a/models/docs.md +++ b/models/docs.md @@ -412,3 +412,45 @@ Response provided by the adapter as JSON. All results as a JSON blob {% enddocs %} + +{% docs warn_after_count %} + +Positive integer indicating a threshold to warn (used alongside warn_after_period) + +{% enddocs %} + +{% docs warn_after_period %} + +Used alongside warn_after_count to indicate number of periods (minutes, hours, days) for a warn threshold + +{% enddocs %} + +{% docs error_after_count %} + +Positive integer indicating a threshold to error (used alongside error_after_period) + +{% enddocs %} + +{% docs error_after_period %} + +Used alongside error_after_count to indicate number of periods (minutes, hours, days) for a error threshold + +{% enddocs %} + +{% docs max_loaded_at %} + +Max value of loaded_at_field timestamp in the source table when queried + +{% enddocs %} + +{% docs snapshotted_at %} + +Current timestamp when querying + +{% enddocs %} + +{% docs age %} + +Interval between max_loaded_at and snapshotted_at, calculated in python to handle timezone complexity + +{% enddocs %} \ No newline at end of file diff --git a/models/fct_dbt__source_executions.sql b/models/fct_dbt__source_executions.sql new file mode 100644 index 00000000..27ede811 --- /dev/null +++ b/models/fct_dbt__source_executions.sql @@ -0,0 +1,38 @@ +with base as ( + + select * + from {{ ref('stg_dbt__source_executions') }} + +), + +source_executions as ( + + select + source_execution_id, + command_invocation_id, + node_id, + run_started_at, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + {% if target.type == "sqlserver" %} "schema" + {% else %} schema + {% endif %}, + name, + source_name, + loaded_at_field, + warn_after_count, + warn_after_period, + error_after_count, + error_after_period, + max_loaded_at, + snapshotted_at, + age, + adapter_response + from base + +) + +select * from source_executions diff --git a/models/fct_dbt__source_executions.yml b/models/fct_dbt__source_executions.yml new file mode 100644 index 00000000..36e4e612 --- /dev/null +++ b/models/fct_dbt__source_executions.yml @@ -0,0 +1,46 @@ +version: 2 + +models: + - name: fct_dbt__source_executions + description: Fact model for data about source freshness executions. + columns: + - name: command_invocation_id + description: '{{ doc("command_invocation_id") }}' + - name: node_id + description: '{{ doc("node_id") }}' + - name: run_started_at + description: '{{ doc("run_started_at") }}' + - name: thread_id + description: '{{ doc("thread_id") }}' + - name: status + description: '{{ doc("status") }}' + - name: compile_started_at + description: '{{ doc("compile_started_at") }}' + - name: query_completed_at + description: '{{ doc("query_completed_at") }}' + - name: total_node_runtime + description: '{{ doc("total_node_runtime") }}' + - name: schema + description: '{{ doc("schema") }}' + - name: name + description: '{{ doc("name") }}' + - name: source_name + description: '{{ doc("source_name") }}' + - name: loaded_at_field + description: '{{ doc("loaded_at_field") }}' + - name: warn_after_count + description: '{{ doc("warn_after_count") }}' + - name: warn_after_period + description: '{{ doc("warn_after_period") }}' + - name: error_after_count + description: '{{ doc("error_after_count") }}' + - name: error_after_period + description: '{{ doc("error_after_period") }}' + - name: max_loaded_at + description: '{{ doc("max_loaded_at") }}' + - name: snapshotted_at + description: '{{ doc("snapshotted_at") }}' + - name: age + description: '{{ doc("age") }}' + - name: adapter_response + description: '{{ doc("adapter_response") }}' diff --git a/models/sources/source_executions.sql b/models/sources/source_executions.sql new file mode 100644 index 00000000..c562694a --- /dev/null +++ b/models/sources/source_executions.sql @@ -0,0 +1,31 @@ +/* Bigquery won't let us `where` without `from` so we use this workaround */ +with dummy_cte as ( + select 1 as foo +) + +select + cast(null as {{ type_string() }}) as command_invocation_id, + cast(null as {{ type_string() }}) as node_id, + cast(null as {{ type_timestamp() }}) as run_started_at, + cast(null as {{ type_string() }}) as thread_id, + cast(null as {{ type_string() }}) as status, + cast(null as {{ type_timestamp() }}) as compile_started_at, + cast(null as {{ type_timestamp() }}) as query_completed_at, + cast(null as {{ type_float() }}) as total_node_runtime, + cast(null as {{ type_string() }}) as + {% if target.type == "sqlserver" %} "schema" + {% else %} schema + {% endif %}, + cast(null as {{ type_string() }}) as name, + cast(null as {{ type_string() }}) as source_name, + cast(null as {{ type_string() }}) as loaded_at_field, + cast(null as {{ type_int() }}) as warn_after_count, + cast(null as {{ type_string() }}) as warn_after_period, + cast(null as {{ type_int() }}) as error_after_count, + cast(null as {{ type_string() }}) as error_after_period, + cast(null as {{ type_timestamp() }}) as max_loaded_at, + cast(null as {{ type_timestamp() }}) as snapshotted_at, + cast(null as {{ type_float() }}) as age, + cast(null as {{ type_json() }}) as adapter_response +from dummy_cte +where 1 = 0 diff --git a/models/sources/source_executions.yml b/models/sources/source_executions.yml new file mode 100644 index 00000000..347bad55 --- /dev/null +++ b/models/sources/source_executions.yml @@ -0,0 +1,46 @@ +version: 2 + +models: + - name: source_executions + description: Base model for data about model executions. One row per model execution. + columns: + - name: command_invocation_id + description: '{{ doc("command_invocation_id") }}' + - name: node_id + description: '{{ doc("node_id") }}' + - name: run_started_at + description: '{{ doc("run_started_at") }}' + - name: thread_id + description: '{{ doc("thread_id") }}' + - name: status + description: '{{ doc("status") }}' + - name: compile_started_at + description: '{{ doc("compile_started_at") }}' + - name: query_completed_at + description: '{{ doc("query_completed_at") }}' + - name: total_node_runtime + description: '{{ doc("total_node_runtime") }}' + - name: schema + description: '{{ doc("schema") }}' + - name: name + description: '{{ doc("name") }}' + - name: source_name + description: '{{ doc("source_name") }}' + - name: loaded_at_field + description: '{{ doc("loaded_at_field") }}' + - name: warn_after_count + description: '{{ doc("warn_after_count") }}' + - name: warn_after_period + description: '{{ doc("warn_after_period") }}' + - name: error_after_count + description: '{{ doc("error_after_count") }}' + - name: error_after_period + description: '{{ doc("error_after_period") }}' + - name: max_loaded_at + description: '{{ doc("max_loaded_at") }}' + - name: snapshotted_at + description: '{{ doc("snapshotted_at") }}' + - name: age + description: '{{ doc("age") }}' + - name: adapter_response + description: '{{ doc("adapter_response") }}' diff --git a/models/staging/stg_dbt__source_executions.sql b/models/staging/stg_dbt__source_executions.sql new file mode 100644 index 00000000..6f63b7c3 --- /dev/null +++ b/models/staging/stg_dbt__source_executions.sql @@ -0,0 +1,38 @@ +with base as ( + + select * + from {{ ref('source_executions') }} + +), + +enhanced as ( + + select + {{ dbt_artifacts.generate_surrogate_key(['command_invocation_id', 'node_id']) }} as source_execution_id, + command_invocation_id, + node_id, + run_started_at, + {{ split_part('thread_id', "'-'", 2) }} as thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + {% if target.type == "sqlserver" %} "schema" + {% else %} schema + {% endif %}, + name, + source_name, + loaded_at_field, + warn_after_count, + warn_after_period, + error_after_count, + error_after_period, + max_loaded_at, + snapshotted_at, + age, + adapter_response + from base + +) + +select * from enhanced diff --git a/models/staging/stg_dbt__source_executions.yml b/models/staging/stg_dbt__source_executions.yml new file mode 100644 index 00000000..6e6ef87d --- /dev/null +++ b/models/staging/stg_dbt__source_executions.yml @@ -0,0 +1,46 @@ +version: 2 + +models: + - name: stg_dbt__source_executions + description: Staging model for data about source freshness executions. One row per source freshness execution. + columns: + - name: command_invocation_id + description: '{{ doc("command_invocation_id") }}' + - name: node_id + description: '{{ doc("node_id") }}' + - name: run_started_at + description: '{{ doc("run_started_at") }}' + - name: thread_id + description: '{{ doc("thread_id") }}' + - name: status + description: '{{ doc("status") }}' + - name: compile_started_at + description: '{{ doc("compile_started_at") }}' + - name: query_completed_at + description: '{{ doc("query_completed_at") }}' + - name: total_node_runtime + description: '{{ doc("total_node_runtime") }}' + - name: schema + description: '{{ doc("schema") }}' + - name: name + description: '{{ doc("name") }}' + - name: source_name + description: '{{ doc("source_name") }}' + - name: loaded_at_field + description: '{{ doc("loaded_at_field") }}' + - name: warn_after_count + description: '{{ doc("warn_after_count") }}' + - name: warn_after_period + description: '{{ doc("warn_after_period") }}' + - name: error_after_count + description: '{{ doc("error_after_count") }}' + - name: error_after_period + description: '{{ doc("error_after_period") }}' + - name: max_loaded_at + description: '{{ doc("max_loaded_at") }}' + - name: snapshotted_at + description: '{{ doc("snapshotted_at") }}' + - name: age + description: '{{ doc("age") }}' + - name: adapter_response + description: '{{ doc("adapter_response") }}' diff --git a/tox.ini b/tox.ini index 92803151..f5b69ccc 100644 --- a/tox.ini +++ b/tox.ini @@ -123,6 +123,7 @@ deps = dbt-snowflake~=1.9.0 commands = dbt clean dbt deps + dbt source freshness --target snowflake dbt build --target snowflake @@ -132,6 +133,7 @@ deps = dbt-snowflake~=1.3.0 commands = dbt clean dbt deps + dbt source freshness --target snowflake dbt build --target snowflake [testenv:integration_snowflake_1_4_0] @@ -140,6 +142,7 @@ deps = dbt-snowflake~=1.4.0 commands = dbt clean dbt deps + dbt source freshness --target snowflake dbt build --target snowflake [testenv:integration_snowflake_1_5_0] @@ -148,6 +151,7 @@ deps = dbt-snowflake~=1.5.0 commands = dbt clean dbt deps + dbt source freshness --target snowflake dbt build --target snowflake [testenv:integration_snowflake_1_6_0] @@ -156,6 +160,7 @@ deps = dbt-snowflake~=1.6.0 commands = dbt clean dbt deps + dbt source freshness --target snowflake dbt build --target snowflake [testenv:integration_snowflake_1_7_0] @@ -164,6 +169,7 @@ deps = dbt-snowflake~=1.7.0 commands = dbt clean dbt deps + dbt source freshness --target snowflake dbt build --target snowflake [testenv:integration_snowflake_1_8_0] @@ -172,6 +178,7 @@ deps = dbt-snowflake~=1.8.0 commands = dbt clean dbt deps + dbt source freshness --target snowflake dbt build --target snowflake [testenv:integration_snowflake_1_9_0] @@ -189,6 +196,7 @@ deps = dbt-databricks~=1.9.0 commands = dbt clean dbt deps + dbt source freshness --target databricks dbt build --target databricks [testenv:integration_databricks_1_3_0] @@ -197,6 +205,7 @@ deps = dbt-databricks~=1.3.0 commands = dbt clean dbt deps + dbt source freshness --target databricks dbt build --target databricks [testenv:integration_databricks_1_4_0] @@ -205,6 +214,7 @@ deps = dbt-databricks~=1.4.0 commands = dbt clean dbt deps + dbt source freshness --target databricks dbt build --target databricks [testenv:integration_databricks_1_5_0] @@ -213,6 +223,7 @@ deps = dbt-databricks~=1.5.0 commands = dbt clean dbt deps + dbt source freshness --target databricks dbt build --target databricks [testenv:integration_databricks_1_6_0] @@ -221,6 +232,7 @@ deps = dbt-databricks~=1.6.0 commands = dbt clean dbt deps + dbt source freshness --target databricks dbt build --target databricks [testenv:integration_databricks_1_7_0] @@ -229,6 +241,7 @@ deps = dbt-databricks~=1.7.0 commands = dbt clean dbt deps + dbt source freshness --target databricks dbt build --target databricks [testenv:integration_databricks_1_8_0] @@ -237,6 +250,7 @@ deps = dbt-databricks~=1.8.0 commands = dbt clean dbt deps + dbt source freshness --target databricks dbt build --target databricks [testenv:integration_databricks_1_9_0] @@ -254,6 +268,7 @@ deps = dbt-bigquery~=1.9.0 commands = dbt clean dbt deps + dbt source freshness --target bigquery --vars '"my_var": "my value"' dbt build --target bigquery --vars '"my_var": "my value"' [testenv:integration_bigquery_1_3_0] @@ -262,6 +277,7 @@ deps = dbt-bigquery~=1.3.0 commands = dbt clean dbt deps + dbt source freshness --target bigquery --vars '"my_var": "my value"' dbt build --target bigquery --vars '"my_var": "my value"' [testenv:integration_bigquery_1_4_0] @@ -270,6 +286,7 @@ deps = dbt-bigquery~=1.4.0 commands = dbt clean dbt deps + dbt source freshness --target bigquery --vars '"my_var": "my value"' dbt build --target bigquery --vars '"my_var": "my value"' [testenv:integration_bigquery_1_5_0] @@ -278,6 +295,7 @@ deps = dbt-bigquery~=1.5.0 commands = dbt clean dbt deps + dbt source freshness --target bigquery --vars '"my_var": "my value"' dbt build --target bigquery --vars '"my_var": "my value"' [testenv:integration_bigquery_1_6_0] @@ -286,6 +304,7 @@ deps = dbt-bigquery~=1.6.0 commands = dbt clean dbt deps + dbt source freshness --target bigquery --vars '"my_var": "my value"' dbt build --target bigquery --vars '"my_var": "my value"' [testenv:integration_bigquery_1_7_0] @@ -294,6 +313,7 @@ deps = dbt-bigquery~=1.7.0 commands = dbt clean dbt deps + dbt source freshness --target bigquery --vars '"my_var": "my value"' dbt build --target bigquery --vars '"my_var": "my value"' [testenv:integration_bigquery_1_8_0] @@ -302,6 +322,7 @@ deps = dbt-bigquery~=1.8.0 commands = dbt clean dbt deps + dbt source freshness --target bigquery --vars '"my_var": "my value"' dbt build --target bigquery --vars '"my_var": "my value"' [testenv:integration_bigquery_1_9_0] @@ -319,6 +340,7 @@ deps = dbt-spark[ODBC]~=1.4.0 commands = dbt clean dbt deps + dbt source freshness --target spark dbt build --exclude snapshot --target spark [testenv:integration_postgres] @@ -329,6 +351,7 @@ deps = commands = dbt clean dbt deps + dbt source freshness --target postgres dbt build --target postgres [testenv:integration_postgres_1_3_0] @@ -337,6 +360,7 @@ deps = dbt-postgres~=1.3.0 commands = dbt clean dbt deps + dbt source freshness --target postgres dbt build --target postgres [testenv:integration_postgres_1_4_0] @@ -345,6 +369,7 @@ deps = dbt-postgres~=1.4.0 commands = dbt clean dbt deps + dbt source freshness --target postgres dbt build --target postgres [testenv:integration_postgres_1_5_0] @@ -353,6 +378,7 @@ deps = dbt-postgres~=1.5.0 commands = dbt clean dbt deps + dbt source freshness --target postgres dbt build --target postgres [testenv:integration_postgres_1_6_0] @@ -361,6 +387,7 @@ deps = dbt-postgres~=1.6.0 commands = dbt clean dbt deps + dbt source freshness --target postgres dbt build --target postgres [testenv:integration_postgres_1_7_0] @@ -369,6 +396,7 @@ deps = dbt-postgres~=1.7.0 commands = dbt clean dbt deps + dbt source freshness --target postgres dbt build --target postgres [testenv:integration_postgres_1_8_0] @@ -379,6 +407,7 @@ deps = commands = dbt clean dbt deps + dbt source freshness --target postgres dbt build --target postgres [testenv:integration_postgres_1_9_0]