diff --git a/CHANGELOG.md b/CHANGELOG.md index 7eeaa39..ea1c49a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,15 @@ Types of changes: ## [Unreleased] +### Fixed + +- Improve nested/dotted column handling: when data is loaded from external sources via database accessors, nested + columns (e.g., `value.shopId`) are now aliased with underscores (`value_shopId`) for consistent querying. Native + DuckDB struct columns continue to use dot notation. This ensures proper handling in WHERE clauses, JOIN conditions, + and SELECT statements across all check types. +- Consolidate date-type filters to be ORed together (instead of ANDed) when fetching data into memory, allowing multiple + date conditions to apply correctly. + ## [0.11.2] - 2026-01-23 ### Fixed diff --git a/src/koality/checks.py b/src/koality/checks.py index 326bda8..1fac204 100644 --- a/src/koality/checks.py +++ b/src/koality/checks.py @@ -114,13 +114,20 @@ def __init__( def in_memory_column(self) -> str: """Return the column name to reference in in-memory queries. - If a configured column references a nested field (e.g. "value.shopId"), - the in-memory representation uses the last segment ("shopId"). This - property provides that flattened name without modifying the original + If a configured column references a nested field (e.g. "value.shopId"): + - When querying data loaded via database_accessor: uses underscores ("value_shopId") + because the executor flattens struct columns with underscore aliases + - When querying existing DuckDB tables (no accessor): keeps dots ("value.shopId") + to support native DuckDB struct column syntax + + This property provides the appropriate name without modifying the original configured `self.check_column` which is still used for result writing. """ - if isinstance(self.check_column, str) and "." in self.check_column: - return self.check_column.split(".")[-1] + if isinstance(self.check_column, str) and "." in self.check_column: # noqa: SIM102 + # Only convert to underscores if data was loaded via database_accessor + # (which flattens structs). For native DuckDB tables, keep dotted notation. + if self.database_accessor: + return self.check_column.replace(".", "_") return self.check_column @property @@ -391,15 +398,25 @@ def get_identifier_filter(filters: dict[str, dict[str, Any]]) -> tuple[str, dict return None @staticmethod - def assemble_where_statement(filters: dict[str, dict[str, Any]], *, strip_dotted_columns: bool = True) -> str: + def assemble_where_statement( # noqa: C901 + filters: dict[str, dict[str, Any]], + *, + strip_dotted_columns: bool = True, + database_accessor: str | None = None, + ) -> str: """Generate the where statement for the check query using the specified filters. Args: filters: A dict containing filter specifications, e.g., strip_dotted_columns: When True (default), dotted column names (e.g. "a.b") are - reduced to their last component ("b") for WHERE clauses. If False, the full - dotted expression is preserved. This is useful when querying source databases - that expect the original dotted column syntax. + transformed based on the data source: + - With database_accessor: converted to underscores ("a_b") for flattened data + - Without database_accessor: kept as dots ("a.b") for native DuckDB structs + If False, the full dotted expression is preserved regardless (used when + querying source databases that expect the original dotted column syntax). + database_accessor: Optional database accessor string. When provided and non-empty, + indicates data was loaded from external source and dotted columns should be + converted to underscores. Example filters: `{ @@ -445,17 +462,22 @@ def assemble_where_statement(filters: dict[str, dict[str, Any]], *, strip_dotted # If column is not provided we cannot build a WHERE condition if column is None: continue - # If the column references a nested field (e.g. "value.shopId"), - # databases that flatten JSON may have the column stored without the - # prefix. By default we use the last component after the dot for the - # WHERE clause, but callers can disable this behavior by setting - # `strip_dotted_columns=False` (used when querying source DBs so the - # original dotted expression is preserved). - if isinstance(column, str) and "." in column and strip_dotted_columns: - column = column.split(".")[-1] + # If the column references a nested field (e.g. "value.shopId"): + # - With database_accessor: convert to underscores (value_shopId) for flattened data + # - Without database_accessor: keep dots (value.shopId) for native DuckDB structs + # Callers can disable this by setting `strip_dotted_columns=False` + # (used when querying source DBs where the original dotted expression is needed). + if isinstance(column, str) and "." in column and strip_dotted_columns and database_accessor: + column = column.replace(".", "_") + # else: keep dotted notation for native DuckDB struct support operator = filter_dict.get("operator", "=") + # Cast date columns for proper comparison + is_date_filter = filter_dict.get("type") == "date" + if is_date_filter: + column = f"CAST({column} AS DATE)" + # Handle NULL values with IS NULL / IS NOT NULL if value is None: if operator == "!=": @@ -465,6 +487,9 @@ def assemble_where_statement(filters: dict[str, dict[str, Any]], *, strip_dotted continue formatted_value = format_filter_value(value, operator) + # Prefix DATE for date type filters + if is_date_filter and operator not in ("BETWEEN", "IN", "NOT IN"): + formatted_value = f"DATE {formatted_value}" filters_statements.append(f" {column} {operator} {formatted_value}") if len(filters_statements) == 0: @@ -547,7 +572,7 @@ def assemble_query(self) -> str: if isinstance(self, IqrOutlierCheck): filters = {name: cfg for name, cfg in filters.items() if cfg.get("type") != "date"} - if where_statement := self.assemble_where_statement(filters): + if where_statement := self.assemble_where_statement(filters, database_accessor=self.database_accessor): return main_query + "\n" + where_statement return main_query @@ -561,7 +586,7 @@ def assemble_data_exists_query(self) -> str: "{self.table}" """ - if where_statement := self.assemble_where_statement(self.filters): + if where_statement := self.assemble_where_statement(self.filters, database_accessor=self.database_accessor): return f"{data_exists_query}\n{where_statement}" return data_exists_query @@ -861,7 +886,7 @@ def assemble_query(self) -> str: f"CAST({date_col} AS DATE) BETWEEN (DATE '{date_val}' - INTERVAL 14 DAY) AND DATE '{date_val}'" ) # TODO: maybe parameterize interval days - if where_statement := self.assemble_where_statement(self.filters): + if where_statement := self.assemble_where_statement(self.filters, database_accessor=self.database_accessor): return main_query + "\nAND\n" + where_statement.removeprefix("WHERE\n") return main_query @@ -1220,7 +1245,7 @@ def assemble_query(self) -> str: order = {"max": "DESC", "min": "ASC"}[self.max_or_min] return f""" {self.query_boilerplate(self.transformation_statement())} - {self.assemble_where_statement(self.filters)} + {self.assemble_where_statement(self.filters, database_accessor=self.database_accessor)} GROUP BY {self.in_memory_column} ORDER BY {self.name} {order} LIMIT 1 -- only the first entry is needed @@ -1343,14 +1368,27 @@ def assemble_name(self) -> str: def assemble_query(self) -> str: """Assemble the SQL query for calculating match rate between tables.""" - right_column_statement = ",\n ".join(self.join_columns_right) - - join_on_statement = "\n AND\n ".join( - [ - f"lefty.{left_col} = righty.{right_col.split('.')[-1]}" - for left_col, right_col in zip(self.join_columns_left, self.join_columns_right, strict=False) - ], - ) + # Transform dotted column names based on data source: + # - With database_accessor: convert to underscores (value.shopId → value_shopId) for flattened data + # - Without database_accessor: keep dots for SELECT (value.shopId), use last part for JOIN (shopId) + if self.database_accessor: + right_column_statement = ",\n ".join([col.replace(".", "_") for col in self.join_columns_right]) + join_on_statement = "\n AND\n ".join( + [ + f"lefty.{left_col.replace('.', '_')} = righty.{right_col.replace('.', '_')}" + for left_col, right_col in zip(self.join_columns_left, self.join_columns_right, strict=False) + ], + ) + else: + # For native DuckDB struct columns: SELECT uses dotted notation, + # but DuckDB names the result column as just the last part + right_column_statement = ",\n ".join(self.join_columns_right) + join_on_statement = "\n AND\n ".join( + [ + f"lefty.{left_col.split('.')[-1]} = righty.{right_col.split('.')[-1]}" + for left_col, right_col in zip(self.join_columns_left, self.join_columns_right, strict=False) + ], + ) return f""" WITH @@ -1360,14 +1398,14 @@ def assemble_query(self) -> str: TRUE AS in_right_table FROM "{self.right_table}" - {self.assemble_where_statement(self.filters_right)} + {self.assemble_where_statement(self.filters_right, database_accessor=self.database_accessor)} ), lefty AS ( SELECT * FROM "{self.left_table}" - {self.assemble_where_statement(self.filters_left)} + {self.assemble_where_statement(self.filters_left, database_accessor=self.database_accessor)} ) SELECT @@ -1397,7 +1435,7 @@ def assemble_data_exists_query(self) -> str: COUNT(*) AS right_counter, FROM "{self.right_table}" - {self.assemble_where_statement(self.filters_right)} + {self.assemble_where_statement(self.filters_right, database_accessor=self.database_accessor)} ), lefty AS ( @@ -1405,7 +1443,7 @@ def assemble_data_exists_query(self) -> str: COUNT(*) AS left_counter, FROM "{self.left_table}" - {self.assemble_where_statement(self.filters_left)} + {self.assemble_where_statement(self.filters_left, database_accessor=self.database_accessor)} ) SELECT @@ -1508,7 +1546,10 @@ def assemble_name(self) -> str: def assemble_query(self) -> str: """Assemble the SQL query for calculating relative count change.""" - where_statement = self.assemble_where_statement(self.filters).replace("WHERE", "AND") + where_statement = self.assemble_where_statement(self.filters, database_accessor=self.database_accessor).replace( + "WHERE", + "AND", + ) date_col = self.date_filter["column"] date_val = self.date_filter["value"] @@ -1573,7 +1614,7 @@ def assemble_data_exists_query(self) -> str: date_col = self.date_filter["column"] date_val = self.date_filter["value"] - where_statement = self.assemble_where_statement(self.filters) + where_statement = self.assemble_where_statement(self.filters, database_accessor=self.database_accessor) if where_statement: return f"{data_exists_query}\n{where_statement} AND CAST({date_col} AS DATE) = DATE '{date_val}'" return f"{data_exists_query}\nWHERE CAST({date_col} AS DATE) = DATE '{date_val}'" @@ -1693,7 +1734,7 @@ def transformation_statement(self) -> str: if filters: filter_columns = ",\n".join([v["column"] for v in filters.values()]) filter_columns = ",\n" + filter_columns - where_statement = self.assemble_where_statement(filters) + where_statement = self.assemble_where_statement(filters, database_accessor=self.database_accessor) where_statement = "\nAND\n" + where_statement.removeprefix("WHERE\n") return f""" WITH @@ -1770,7 +1811,7 @@ def assemble_data_exists_query(self) -> str: filters = {k: v for k, v in self.filters.items() if v["type"] != "date"} - where_statement = self.assemble_where_statement(filters) + where_statement = self.assemble_where_statement(filters, database_accessor=self.database_accessor) if where_statement: where_statement = f"{where_statement} AND CAST({date_col} AS DATE) = DATE '{date_val}'" else: diff --git a/src/koality/executor.py b/src/koality/executor.py index 65b85b8..247a1d6 100644 --- a/src/koality/executor.py +++ b/src/koality/executor.py @@ -340,7 +340,9 @@ def fetch_data_into_memory(self, data_requirements: defaultdict[str, defaultdict select_parts.append("*") continue if isinstance(col, str) and "." in col: - flat = col.split(".")[-1] + # Replace dots with underscores for deterministic aliasing + # e.g., "value.shopId" becomes "value_shopId" + flat = col.replace(".", "_") # Make flattened name unique if duplicate arises base = flat idx = 1 @@ -364,13 +366,15 @@ def fetch_data_into_memory(self, data_requirements: defaultdict[str, defaultdict select_parts.append(col) columns = ", ".join(select_parts) - # Combine all unique filter groups. Treat date-range filters specially and - # combine them with other filters using AND (date range applies to all other filters). + # Combine all unique filter groups. Separate date filters from other filters. + # All date-related conditions (BETWEEN ranges and date equality) should be ORed. + # Non-date filters should be ANDed with the date conditions. date_filters_sql = set() other_filters_sql = set() for filter_group in requirements["filters"]: filter_dict = {} + date_filter_dict = {} for item in filter_group: # Expect each item to be a (name, frozenset(cfg_items)) tuple if not (isinstance(item, tuple) and len(item) == _DATE_RANGE_TUPLE_SIZE): @@ -390,8 +394,24 @@ def fetch_data_into_memory(self, data_requirements: defaultdict[str, defaultdict date_filters_sql.add(f"({cond})") # date_range handled; continue to next group continue - filter_dict[name] = dict(cfg) + # Separate date-type filters from other filters + if cfg.get("type") == "date": + date_filter_dict[name] = dict(cfg) + else: + filter_dict[name] = dict(cfg) + + # Process date filters separately and add to date_filters_sql + if date_filter_dict: + where_clause = DataQualityCheck.assemble_where_statement( + date_filter_dict, + strip_dotted_columns=False, + ) + if where_clause.strip().startswith("WHERE"): + conditions = where_clause.strip()[len("WHERE") :].strip() + if conditions: + date_filters_sql.add(f"({conditions})") + # Process non-date filters if filter_dict: # When fetching from the source DB, preserve dotted column expressions # (e.g., "value.shopId") in the WHERE so the source provider sees the @@ -403,7 +423,7 @@ def fetch_data_into_memory(self, data_requirements: defaultdict[str, defaultdict if conditions: other_filters_sql.add(f"({conditions})") - # Build final WHERE clause: if we have date filters, AND them with other filters (if any). + # Build final WHERE clause: OR all date filters together, AND with other filters. final_where_clause = "" if date_filters_sql and other_filters_sql: date_part = " OR ".join(sorted(date_filters_sql)) diff --git a/src/koality/models.py b/src/koality/models.py index 56e0f08..e3b419b 100644 --- a/src/koality/models.py +++ b/src/koality/models.py @@ -192,6 +192,32 @@ def persist_results(self) -> bool: class _Check(_LocalDefaults): """Base model for all check configurations.""" + @model_validator(mode="after") + def validate_filters_have_columns(self) -> Self: + """Validate that all filters with concrete values have columns specified. + + This validation runs after defaults merging, ensuring the final filter + configuration is complete. + """ + for filter_name, filter_config in self.filters.items(): + # Skip identifier filters without concrete values (naming-only) + if filter_config.type == "identifier" and (filter_config.value is None or filter_config.value == "*"): + continue + + # Skip partial filters with no value + if filter_config.value is None: + continue + + # Filter has a value but no column - this is an error + if filter_config.column is None: + msg = ( + f"Filter '{filter_name}' has value '{filter_config.value}' " + f"but no column specified. Add 'column: ' to the filter definition." + ) + raise ValueError(msg) + + return self + class _SingleTableCheck(_Check): """Base model for checks that operate on a single table.""" diff --git a/tests/integration/test_executor_fetch_roll_window.py b/tests/integration/test_executor_fetch_roll_window.py new file mode 100644 index 0000000..c055d7e --- /dev/null +++ b/tests/integration/test_executor_fetch_roll_window.py @@ -0,0 +1,82 @@ +"""Integration test for executor fetch with rolling windows.""" + +import duckdb + +from koality.checks import RelCountChangeCheck +from koality.executor import CheckExecutor +from koality.models import Config, _CheckBundle, _GlobalDefaults + + +def test_fetch_data_into_memory_respects_rolling_window() -> None: + """Ensure fetch_data_into_memory restricts rows to the rolling date window.""" + conn = duckdb.connect(":memory:") + + # Create dummy_table with DATE and shop_id + conn.execute( + """ + CREATE TABLE dummy_table ( + DATE DATE, + shop_id VARCHAR, + product_number VARCHAR + ) + """, + ) + + # Insert rows spanning multiple dates and shops + conn.execute( + """ + INSERT INTO dummy_table VALUES + ('2023-01-01', 'SHOP001', 'P1'), + ('2023-01-02', 'SHOP001', 'P2'), + ('2023-01-03', 'SHOP001', 'P3'), + ('2023-01-01', 'SHOP002', 'P4'), + ('2023-01-04', 'SHOP001', 'P5') + """, + ) + + # Create a RelCountChangeCheck targeting 2023-01-03 with rolling_days=2 + check = RelCountChangeCheck( + database_accessor="", + database_provider=None, + table="dummy_table", + check_column="product_number", + rolling_days=2, + filters={ + "date": {"column": "DATE", "value": "2023-01-03", "type": "date"}, + "shop_id": {"column": "shop_id", "value": "SHOP001", "type": "identifier"}, + }, + ) + + # Minimal Config for executor - database_setup empty and accessor set to empty string + cfg = Config( + name="test", + database_setup="", + database_accessor="", + defaults=_GlobalDefaults(), + check_bundles=[_CheckBundle(name="bundle", checks=[])], + ) + + executor = CheckExecutor(cfg, duckdb_client=conn) + # Add our check directly + executor.checks.append(check) + + # Get data requirements and fetch into memory + data_reqs = executor.get_data_requirements() + executor.fetch_data_into_memory(data_reqs) + + # Query the in-memory table + # Normalize DATE to ISO string for comparison + res = [ + (row[0].isoformat(), row[1], row[2]) + for row in conn.execute( + 'SELECT DATE, shop_id, product_number FROM "dummy_table" ORDER BY DATE, product_number', + ).fetchall() + ] + + expected = [ + ("2023-01-01", "SHOP001", "P1"), + ("2023-01-02", "SHOP001", "P2"), + ("2023-01-03", "SHOP001", "P3"), + ] + + assert res == expected diff --git a/tests/integration/test_match_rate_check.py b/tests/integration/test_match_rate_check.py index a134c6c..ec301e4 100644 --- a/tests/integration/test_match_rate_check.py +++ b/tests/integration/test_match_rate_check.py @@ -181,6 +181,73 @@ def test_match_rate_check_different_join_col_names(duckdb_client_renamed: duckdb assert result["VALUE"] == 0.8 +def test_match_rate_check_dotted_column_names(duckdb_client_dotted_columns: duckdb.DuckDBPyConnection) -> None: + """Test match rate check with dotted column names in join_columns_right (like 'value.shopId'). + + This tests the fix where dotted column names in join_columns_right are converted to + underscore-aliased names (value.shopId → value_shopId) to match the column names + created when loading data from external sources. The test simulates how BigQuery struct + columns are aliased - in the actual table they're stored as 'value_shopId', but in the + config they're referenced as 'value.shopId'. + + Note: Uses a non-empty database_accessor to indicate data was loaded/flattened. + """ + check = MatchRateCheck( + database_accessor="bigquery.project.dataset", # Non-empty to trigger underscore conversion + database_provider=None, + left_table="tracking_purchase", + right_table="skufeed", + join_columns_left=["BQ_PARTITIONTIME", "shopId", "product_number"], + join_columns_right=["BQ_PARTITIONTIME", "value.shopId", "product_number"], + check_column="product_number", + filters={ + "partition_date": {"column": "BQ_PARTITIONTIME", "value": "2023-01-01", "type": "date"}, + }, + filters_left={ + "shop_id": {"column": "shopId", "value": "EC1705", "type": "identifier"}, + }, + filters_right={ + "shop_id": {"column": "value.shopId", "value": "EC1705", "type": "identifier"}, + }, + ) + result = check(duckdb_client_dotted_columns) + # 3 / 4 product_numbers found (PROD-999 not in skufeed) + assert result["VALUE"] == 0.75 + + +def test_match_rate_check_with_struct_columns(duckdb_client_with_struct: duckdb.DuckDBPyConnection) -> None: + """Test match rate check with dotted column names that get aliased with underscores when cached. + + This simulates the real BigQuery → DuckDB scenario: In BigQuery config, struct columns + are referenced as 'value.shopId', but when the data is loaded into DuckDB cache, + these columns are aliased with underscores ('value_shopId'). The fix ensures that: + 1. The SELECT DISTINCT statement uses the underscore-aliased name ('value_shopId' not 'value.shopId') + 2. The JOIN conditions use the underscore-aliased name ('righty.value_shopId') + 3. The WHERE clauses use the underscore-aliased name (via strip_dotted_columns) + """ + check = MatchRateCheck( + database_accessor="", + database_provider=None, + left_table="tracking_purchase", + right_table="skufeed", + join_columns_left=["BQ_PARTITIONTIME", "shopId", "product_number"], + join_columns_right=["BQ_PARTITIONTIME", "value.shopId", "product_number"], # Config uses dotted notation + check_column="product_number", + filters={ + "partition_date": {"column": "BQ_PARTITIONTIME", "value": "2023-01-01", "type": "date"}, + }, + filters_left={ + "shop_id": {"column": "shopId", "value": "EC1705", "type": "identifier"}, + }, + filters_right={ + "shop_id": {"column": "value.shopId", "value": "EC1705", "type": "identifier"}, + }, + ) + result = check(duckdb_client_with_struct) + # 3 / 4 product_numbers found (PROD-999 not in skufeed) + assert result["VALUE"] == 0.75 + + @pytest.mark.parametrize( ("day", "shop"), [ @@ -211,6 +278,99 @@ def test_match_rate_check_no_data(duckdb_client: duckdb.DuckDBPyConnection, day: assert result["IDENTIFIER"] == f"shop_code={shop}" +@pytest.fixture +def duckdb_client_dotted_columns() -> duckdb.DuckDBPyConnection: + """Create an in-memory DuckDB connection with underscore-aliased column names. + + This simulates data loaded from external sources where struct columns like 'value.shopId' + are aliased to 'value_shopId' using deterministic underscore replacement. + """ + conn = duckdb.connect(":memory:") + + # Create tracking table + conn.execute(""" + CREATE TABLE tracking_purchase ( + BQ_PARTITIONTIME DATE, + shopId VARCHAR, + product_number VARCHAR + ) + """) + + conn.execute(""" + INSERT INTO tracking_purchase VALUES + ('2023-01-01', 'EC1705', 'PROD-001'), + ('2023-01-01', 'EC1705', 'PROD-002'), + ('2023-01-01', 'EC1705', 'PROD-003'), + ('2023-01-01', 'EC1705', 'PROD-999') + """) + + # Create skufeed table with underscore-aliased struct columns + # In config these are referenced as 'value.shopId', in DuckDB they're 'value_shopId' + conn.execute(""" + CREATE TABLE skufeed ( + BQ_PARTITIONTIME DATE, + value_shopId VARCHAR, + clickstreamSkuId VARCHAR, + product_number VARCHAR + ) + """) + + conn.execute(""" + INSERT INTO skufeed VALUES + ('2023-01-01', 'EC1705', 'SKU-001', 'PROD-001'), + ('2023-01-01', 'EC1705', 'SKU-002', 'PROD-002'), + ('2023-01-01', 'EC1705', 'SKU-003', 'PROD-003') + """) + + return conn + + +@pytest.fixture +def duckdb_client_with_struct() -> duckdb.DuckDBPyConnection: + """Create an in-memory DuckDB connection with actual STRUCT columns. + + This fixture creates tables with real nested STRUCT columns (like BigQuery has) + to test that dotted notation in config (value.shopId) properly accesses nested fields. + """ + conn = duckdb.connect(":memory:") + + # Create tracking table with regular columns + conn.execute(""" + CREATE TABLE tracking_purchase ( + BQ_PARTITIONTIME DATE, + shopId VARCHAR, + product_number VARCHAR + ) + """) + + conn.execute(""" + INSERT INTO tracking_purchase VALUES + ('2023-01-01', 'EC1705', 'PROD-001'), + ('2023-01-01', 'EC1705', 'PROD-002'), + ('2023-01-01', 'EC1705', 'PROD-003'), + ('2023-01-01', 'EC1705', 'PROD-999') + """) + + # Create skufeed table with actual STRUCT columns (nested data) + # This represents raw BigQuery data with struct types + conn.execute(""" + CREATE TABLE skufeed ( + BQ_PARTITIONTIME DATE, + value STRUCT(shopId VARCHAR, clickstreamSkuId VARCHAR), + product_number VARCHAR + ) + """) + + conn.execute(""" + INSERT INTO skufeed VALUES + ('2023-01-01', {'shopId': 'EC1705', 'clickstreamSkuId': 'SKU-001'}, 'PROD-001'), + ('2023-01-01', {'shopId': 'EC1705', 'clickstreamSkuId': 'SKU-002'}, 'PROD-002'), + ('2023-01-01', {'shopId': 'EC1705', 'clickstreamSkuId': 'SKU-003'}, 'PROD-003') + """) + + return conn + + @pytest.fixture def duckdb_client_one_table_empty() -> duckdb.DuckDBPyConnection: """Create an in-memory DuckDB connection with one empty table.""" diff --git a/tests/integration/test_underscore_aliasing.py b/tests/integration/test_underscore_aliasing.py new file mode 100644 index 0000000..bbfdffc --- /dev/null +++ b/tests/integration/test_underscore_aliasing.py @@ -0,0 +1,238 @@ +"""Integration test for underscore aliasing of nested column names.""" + +import duckdb +import pytest + +from koality.checks import MatchRateCheck, NullRatioCheck + +pytestmark = pytest.mark.integration + + +def test_underscore_aliasing_with_struct_columns() -> None: + """Test that nested/struct columns are aliased with underscores when loaded into cache. + + This test verifies the complete flow: + 1. Data with struct columns (value.shopId, value.productId) gets loaded + 2. Columns are aliased as value_shopId, value_productId in DuckDB + 3. Checks reference dotted notation (value.shopId) in config + 4. Check queries automatically convert to underscore notation (value_shopId) + """ + conn = duckdb.connect(":memory:") + + # Create source table with actual STRUCT columns + conn.execute(""" + CREATE TABLE source_data ( + BQ_PARTITIONTIME DATE, + value STRUCT(shopId VARCHAR, productId VARCHAR, price DOUBLE) + ) + """) + + conn.execute(""" + INSERT INTO source_data VALUES + ('2023-01-01', {'shopId': 'EC1705', 'productId': 'PROD-001', 'price': 19.99}), + ('2023-01-01', {'shopId': 'EC1705', 'productId': 'PROD-002', 'price': 29.99}), + ('2023-01-01', {'shopId': 'EC1705', 'productId': NULL, 'price': 39.99}), + ('2023-01-01', {'shopId': 'EC1706', 'productId': 'PROD-004', 'price': 49.99}) + """) + + # Simulate data loading with flattening (as executor would do) + # This creates table with underscore-aliased column names + conn.execute(""" + CREATE TABLE loaded_data AS + SELECT + BQ_PARTITIONTIME, + value.shopId AS value_shopId, + value.productId AS value_productId, + value.price AS value_price + FROM source_data + """) + + # Create check using dotted notation (as in config) AND non-empty database_accessor + check = NullRatioCheck( + database_accessor="bigquery.project.dataset", # Non-empty to trigger underscore conversion + database_provider=None, + table="loaded_data", + check_column="value.productId", # Config uses dotted notation + filters={ + "partition_date": {"column": "BQ_PARTITIONTIME", "value": "2023-01-01", "type": "date"}, + "shop_id": {"column": "value.shopId", "value": "EC1705", "type": "identifier"}, # Filter also uses dotted + }, + ) + + result = check(conn) + # 1 NULL out of 3 rows (EC1705 only) = 0.333 + assert abs(result["VALUE"] - 0.333) < 0.01 + + +def test_underscore_aliasing_matchrate_with_mixed_columns() -> None: + """Test MatchRateCheck with mix of regular and underscore-aliased columns. + + This tests a realistic scenario where: + - Left table has regular flat columns + - Right table has underscore-aliased struct columns + - Config references right table columns with dots + """ + conn = duckdb.connect(":memory:") + + # Left table: regular flat columns (simulating already-processed data) + conn.execute(""" + CREATE TABLE orders ( + order_date DATE, + shop_id VARCHAR, + product_id VARCHAR + ) + """) + + conn.execute(""" + INSERT INTO orders VALUES + ('2023-01-01', 'EC1705', 'PROD-001'), + ('2023-01-01', 'EC1705', 'PROD-002'), + ('2023-01-01', 'EC1705', 'PROD-003'), + ('2023-01-01', 'EC1705', 'PROD-999') + """) + + # Right table: underscore-aliased struct columns (simulating loaded BigQuery data) + conn.execute(""" + CREATE TABLE products ( + date DATE, + value_shopId VARCHAR, + value_productId VARCHAR, + value_inStock BOOLEAN + ) + """) + + conn.execute(""" + INSERT INTO products VALUES + ('2023-01-01', 'EC1705', 'PROD-001', TRUE), + ('2023-01-01', 'EC1705', 'PROD-002', TRUE), + ('2023-01-01', 'EC1705', 'PROD-003', FALSE) + """) + + # Create MatchRateCheck with dotted notation for right table AND non-empty database_accessor + check = MatchRateCheck( + database_accessor="bigquery.project.dataset", # Non-empty to trigger underscore conversion + database_provider=None, + left_table="orders", + right_table="products", + join_columns_left=["order_date", "shop_id", "product_id"], + join_columns_right=["date", "value.shopId", "value.productId"], # Dotted notation + check_column="product_id", + filters_left={ + "date": {"column": "order_date", "value": "2023-01-01", "type": "date"}, + "shop": {"column": "shop_id", "value": "EC1705", "type": "identifier"}, + }, + filters_right={ + "date": {"column": "date", "value": "2023-01-01", "type": "date"}, + "shop": {"column": "value.shopId", "value": "EC1705", "type": "identifier"}, # Dotted in filter + }, + ) + + result = check(conn) + # 3 out of 4 products found (PROD-999 missing) + assert result["VALUE"] == 0.75 + + +def test_underscore_aliasing_matchrate_with_dotted_left_columns() -> None: + """Test MatchRateCheck when BOTH left and right columns have dotted names. + + This tests the scenario from the user's config where left table also has + struct columns (like orderLine.skuId) that get flattened to orderLine_skuId. + """ + conn = duckdb.connect(":memory:") + + # Left table with underscore-aliased columns + conn.execute(""" + CREATE TABLE purchases ( + BQ_PARTITIONTIME DATE, + shopId VARCHAR, + orderLine_skuId VARCHAR + ) + """) + + conn.execute(""" + INSERT INTO purchases VALUES + ('2023-01-01', 'EC1701', 'SKU-001'), + ('2023-01-01', 'EC1701', 'SKU-002'), + ('2023-01-01', 'EC1701', 'SKU-003'), + ('2023-01-01', 'EC1701', 'SKU-999') + """) + + # Right table with underscore-aliased columns + conn.execute(""" + CREATE TABLE skufeed ( + BQ_PARTITIONTIME DATE, + value_shopId VARCHAR, + value_clickstreamskuId VARCHAR + ) + """) + + conn.execute(""" + INSERT INTO skufeed VALUES + ('2023-01-01', 'EC1701', 'SKU-001'), + ('2023-01-01', 'EC1701', 'SKU-002'), + ('2023-01-01', 'EC1701', 'SKU-003') + """) + + # Create MatchRateCheck with dotted notation in BOTH left and right columns + check = MatchRateCheck( + database_accessor="bigquery.project.dataset", + database_provider=None, + left_table="purchases", + right_table="skufeed", + join_columns_left=["BQ_PARTITIONTIME", "shopId", "orderLine.skuId"], # Left has dot! + join_columns_right=["BQ_PARTITIONTIME", "value.shopId", "value.clickstreamskuId"], + check_column="orderLine.skuId", + filters_left={ + "date": {"column": "BQ_PARTITIONTIME", "value": "2023-01-01", "type": "date"}, + "shop": {"column": "shopId", "value": "EC1701", "type": "identifier"}, + }, + filters_right={ + "date": {"column": "BQ_PARTITIONTIME", "value": "2023-01-01", "type": "date"}, + "shop": {"column": "value.shopId", "value": "EC1701", "type": "identifier"}, + }, + ) + + result = check(conn) + # 3 out of 4 SKUs found (SKU-999 not in skufeed) + assert result["VALUE"] == 0.75 + + +def test_underscore_aliasing_with_multiple_nesting_levels() -> None: + """Test underscore aliasing with deeply nested column names. + + Verifies that multi-level nesting like 'data.value.shopId' becomes 'data_value_shopId'. + """ + conn = duckdb.connect(":memory:") + + # Create table with multi-level underscore-aliased columns + conn.execute(""" + CREATE TABLE nested_data ( + date DATE, + data_value_shopId VARCHAR, + data_value_price DOUBLE, + data_meta_source VARCHAR + ) + """) + + conn.execute(""" + INSERT INTO nested_data VALUES + ('2023-01-01', 'EC1705', 19.99, 'API'), + ('2023-01-01', 'EC1705', NULL, 'API'), + ('2023-01-01', 'EC1706', 29.99, 'MANUAL') + """) + + # Check using multi-level dotted notation AND non-empty database_accessor + check = NullRatioCheck( + database_accessor="bigquery.project.dataset", # Non-empty to trigger underscore conversion + database_provider=None, + table="nested_data", + check_column="data.value.price", # Three-level nesting + filters={ + "date": {"column": "date", "value": "2023-01-01", "type": "date"}, + "shop": {"column": "data.value.shopId", "value": "EC1705", "type": "identifier"}, # Multi-level in filter + }, + ) + + result = check(conn) + # 1 NULL out of 2 rows (EC1705 only) + assert result["VALUE"] == 0.5