diff --git a/cwms/catalog/blobs.py b/cwms/catalog/blobs.py index 35b5b853..656d8052 100644 --- a/cwms/catalog/blobs.py +++ b/cwms/catalog/blobs.py @@ -1,9 +1,9 @@ import base64 -from typing import Optional +from typing import Any, Optional import cwms.api as api from cwms.cwms_types import JSON, Data -from cwms.utils.checks import is_base64 +from cwms.utils.checks import has_invalid_chars, is_base64 STORE_DICT = """data = { "office-id": "SWT", @@ -14,6 +14,8 @@ } """ +IGNORED_ID = "ignored" + def get_blob(blob_id: str, office_id: str) -> str: """Get a single BLOB (Binary Large Object). @@ -29,8 +31,13 @@ def get_blob(blob_id: str, office_id: str) -> str: str: the value returned based on the content-type it was stored with as a string """ - endpoint = f"blobs/{blob_id}" - params = {"office": office_id} + params: dict[str, Any] = {} + if has_invalid_chars(blob_id): + endpoint = f"blobs/{IGNORED_ID}" + params["blob-id"] = blob_id + else: + endpoint = f"blobs/{blob_id}" + params["office"] = office_id response = api.get(endpoint, params, api_version=1) return str(response) @@ -107,8 +114,13 @@ def delete_blob(blob_id: str, office_id: str) -> None: None """ - endpoint = f"blobs/{blob_id}" - params = {"office": office_id} + params: dict[str, Any] = {} + if has_invalid_chars(blob_id): + endpoint = f"blobs/{IGNORED_ID}" + params["blob-id"] = blob_id + else: + endpoint = f"blobs/{blob_id}" + params["office"] = office_id return api.delete(endpoint, params, api_version=1) @@ -143,6 +155,11 @@ def update_blob(data: JSON, fail_if_not_exists: Optional[bool] = True) -> None: blob_id = data.get("id", "").upper() - endpoint = f"blobs/{blob_id}" - params = {"fail-if-not-exists": fail_if_not_exists} + params: dict[str, Any] = {} + if has_invalid_chars(blob_id): + endpoint = f"blobs/{IGNORED_ID}" + params["blob-id"] = blob_id + else: + endpoint = f"blobs/{blob_id}" + params["fail-if-not-exists"] = fail_if_not_exists return api.patch(endpoint, data, params, api_version=1) diff --git a/cwms/catalog/clobs.py b/cwms/catalog/clobs.py index 16506431..617c108b 100644 --- a/cwms/catalog/clobs.py +++ b/cwms/catalog/clobs.py @@ -1,10 +1,21 @@ -from typing import Optional +from typing import Any, Optional import cwms.api as api from cwms.cwms_types import JSON, Data +from cwms.utils.checks import has_invalid_chars +STORE_DICT = """data = { + "office-id": "SWT", + "id": "CLOB_ID", + "description": "Your description here", + "value": "STRING of content" +} +""" -def get_clob(clob_id: str, office_id: str, clob_id_query: Optional[str] = None) -> Data: +IGNORED_ID = "ignored" + + +def get_clob(clob_id: str, office_id: str) -> Data: """Get a single clob. Parameters @@ -13,16 +24,6 @@ def get_clob(clob_id: str, office_id: str, clob_id_query: Optional[str] = None) Specifies the id of the clob office_id: string Specifies the office of the clob. - clob_id_query: string - If this query parameter is provided the id path parameter is ignored and the - value of the query parameter is used. Note: this query parameter is necessary - for id's that contain '/' or other special characters. Because of abuse even - properly escaped '/' in url paths are blocked. When using this query parameter - a valid path parameter must still be provided for the request to be properly - routed. If your clob id contains '/' you can't specify the clob-id query - parameter and also specify the id path parameter because firewall and/or server - rules will deny the request even though you are specifying this override. "ignored" - is suggested. Returns @@ -30,11 +31,13 @@ def get_clob(clob_id: str, office_id: str, clob_id_query: Optional[str] = None) cwms data type. data.json will return the JSON output and data.df will return a dataframe """ - endpoint = f"clobs/{clob_id}" - params = { - "office": office_id, - "clob-id-query": clob_id_query, - } + params: dict[str, Any] = {} + if has_invalid_chars(clob_id): + endpoint = f"clobs/{IGNORED_ID}" + params["clob-id"] = clob_id + else: + endpoint = f"clobs/{clob_id}" + params["office"] = office_id response = api.get(endpoint, params) return Data(response) @@ -90,13 +93,20 @@ def delete_clob(clob_id: str, office_id: str) -> None: None """ - endpoint = f"clobs/{clob_id}" - params = {"office": office_id} + params: dict[str, Any] = {} + if has_invalid_chars(clob_id): + endpoint = f"clobs/{IGNORED_ID}" + params["clob-id"] = clob_id + else: + endpoint = f"clobs/{clob_id}" + params["office"] = office_id return api.delete(endpoint, params=params, api_version=1) -def update_clob(data: JSON, clob_id: str, ignore_nulls: Optional[bool] = True) -> None: +def update_clob( + data: JSON, clob_id: Optional[str] = None, ignore_nulls: Optional[bool] = True +) -> None: """Updates clob Parameters @@ -110,7 +120,7 @@ def update_clob(data: JSON, clob_id: str, ignore_nulls: Optional[bool] = True) - "value": "string" } clob_id: string - Specifies the id of the clob to be deleted + Specifies the id of the clob to be deleted. Unused if "id" is present in JSON data. ignore_nulls: Boolean If true, null and empty fields in the provided clob will be ignored and the existing value of those fields left in place. Default: true @@ -122,8 +132,19 @@ def update_clob(data: JSON, clob_id: str, ignore_nulls: Optional[bool] = True) - if not isinstance(data, dict): raise ValueError("Cannot store a Clob without a JSON data dictionary") - endpoint = f"clobs/{clob_id}" - params = {"ignore-nulls": ignore_nulls} + if "id" in data: + clob_id = data.get("id", "").upper() + + if clob_id is None: + raise ValueError(f"Cannot update a Clob without an 'id' field:\n{STORE_DICT}") + + params: dict[str, Any] = {} + if has_invalid_chars(clob_id): + endpoint = f"clobs/{IGNORED_ID}" + params["clob-id"] = clob_id + else: + endpoint = f"clobs/{clob_id}" + params["ignore-nulls"] = ignore_nulls return api.patch(endpoint, data, params, api_version=1) diff --git a/cwms/utils/checks.py b/cwms/utils/checks.py index e16e7235..e2f0dc15 100644 --- a/cwms/utils/checks.py +++ b/cwms/utils/checks.py @@ -8,3 +8,15 @@ def is_base64(s: str) -> bool: return base64.b64encode(decoded).decode("utf-8") == s except (ValueError, TypeError): return False + + +def has_invalid_chars(id: str) -> bool: + """ + Checks if ID contains any invalid web path characters. + """ + INVALID_PATH_CHARS = ["/", "\\", "&", "?", "="] + + for char in INVALID_PATH_CHARS: + if char in id: + return True + return False diff --git a/docker-compose.yml b/docker-compose.yml index 9079fbc2..26b39aee 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,22 +3,27 @@ volumes: auth_data: services: db: - image: ghcr.io/hydrologicengineeringcenter/cwms-database/cwms/database-ready-ora-23.5:latest-dev + image: ghcr.io/hydrologicengineeringcenter/cwms-database/cwms/database-ready-ora-23.5:develop-nightly environment: #- ORACLE_DATABASE=FREEPDB1 - ORACLE_PASSWORD=badSYSpassword - CWMS_PASSWORD=simplecwmspasswD1 - OFFICE_ID=HQ - OFFICE_EROC=s0 - ports: ["1526:1521"] + ports: + - "1526:1521" healthcheck: - test: ["CMD", "tnsping", "FREEPDB1"] + test: + [ + "CMD-SHELL", + "sqlplus -s -L sys/badSYSpassword@localhost:1521/FREEPDB1 as sysdba <<< 'exit;'", + ] interval: 30s timeout: 50s retries: 50 start_period: 40m db_webuser_permissions: - image: ghcr.io/hydrologicengineeringcenter/cwms-database/cwms/schema_installer:latest-dev + image: ${CWMS_SCHEMA_INSTALLER_IMAGE:-registry-public.hecdev.net/cwms/schema_installer:latest-dev} restart: "no" environment: - DB_HOST_PORT=db:1521 @@ -31,16 +36,12 @@ services: - INSTALLONCE=1 - QUIET=1 command: > - sh -xc "sqlplus CWMS_20/$$CWMS_PASSWORD@$$DB_HOST_PORT$$DB_NAME @/setup_sql/users - $$OFFICE_EROC" - volumes: [./compose_files/sql:/setup_sql:ro] + sh -xc "sqlplus CWMS_20/$$CWMS_PASSWORD@$$DB_HOST_PORT$$DB_NAME @/setup_sql/users $$OFFICE_EROC" + volumes: + - ./compose_files/sql:/setup_sql:ro depends_on: db: condition: service_healthy - auth: - condition: service_healthy - traefik: - condition: service_healthy data-api: depends_on: @@ -58,6 +59,7 @@ services: - ./compose_files/pki/certs:/conf/ - ./compose_files/tomcat/logging.properties:/usr/local/tomcat/conf/logging.properties:ro environment: + - JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 - CDA_JDBC_DRIVER=oracle.jdbc.driver.OracleDriver - CDA_JDBC_URL=jdbc:oracle:thin:@db/FREEPDB1 - CDA_JDBC_USERNAME=s0webtest @@ -73,9 +75,12 @@ services: - cwms.dataapi.access.openid.altAuthUrl=http://localhost:${APP_PORT:-8082} - cwms.dataapi.access.openid.useAltWellKnown=true - cwms.dataapi.access.openid.issuer=http://localhost:${APP_PORT:-8082}/auth/realms/cwms - expose: [7000] + expose: + - 7000 + - 5005 healthcheck: - test: ["CMD", "/usr/bin/curl", "-I", "localhost:7000/cwms-data/offices/HEC"] + test: + ["CMD", "/usr/bin/curl", "-I", "localhost:7000/cwms-data/offices/HEC"] interval: 5s timeout: 1s retries: 100 @@ -84,15 +89,13 @@ services: - "traefik.enable=true" - "traefik.http.routers.data-api.rule=PathPrefix(`/cwms-data`)" - "traefik.http.routers.data-api.entryPoints=web" + - "traefik.http.services.data-api.loadbalancer.server.port=7000" auth: image: quay.io/keycloak/keycloak:19.0.1 - command: ["start-dev", "--import-realm"] + command: ["start-dev", "--features-disabled=admin2", "--import-realm"] healthcheck: - test: - - "CMD-SHELL" - - "/usr/bin/curl -If localhost:${APP_PORT:-8082}/auth/health/ready || exit\ - \ 1" + test: "/usr/bin/curl -If localhost:${APP_PORT:-8082}/auth/health/ready || exit 1" interval: 5s timeout: 1s retries: 100 @@ -108,8 +111,6 @@ services: - KC_PROXY=none - KC_HTTP_ENABLED=true - KC_HTTP_RELATIVE_PATH=/auth - - KC_HOSTNAME=localhost - - KC_DB=dev-file volumes: - ./compose_files/keycloak/realm.json:/opt/keycloak/data/import/realm.json:ro labels: @@ -123,10 +124,13 @@ services: # Proxy for HTTPS for OpenID traefik: - image: traefik:v3.6.2 - ports: ["${APP_PORT:-8082}:80"] - expose: ["8080"] - volumes: ["/var/run/docker.sock:/var/run/docker.sock:ro"] + image: "traefik:v3.6.2" + ports: + - "${APP_PORT:-8082}:80" + expose: + - "8081" + volumes: + - "/var/run/docker.sock:/var/run/docker.sock:ro" healthcheck: test: traefik healthcheck --ping command: diff --git a/tests/cda/blobs/blob_CDA_test.py b/tests/cda/blobs/blob_CDA_test.py deleted file mode 100644 index 6e67606e..00000000 --- a/tests/cda/blobs/blob_CDA_test.py +++ /dev/null @@ -1,162 +0,0 @@ -# tests/test_blob.py -from __future__ import annotations - -import base64 -import mimetypes -from datetime import datetime, timezone -from pathlib import Path -from typing import Optional - -import pandas as pd -import pytest - -import cwms.catalog.blobs as blobs - -TEST_OFFICE = "MVP" -TEST_BLOB_ID = "PYTEST_BLOB_ALPHA" -TEST_BLOB_UPDATED_ID = TEST_BLOB_ID # keeping same id; update modifies fields -TEST_MEDIA_TYPE = "text/plain" -TEST_DESC = "pytest blob ? initial" -TEST_DESC_UPDATED = "pytest blob ? updated" -TEST_TEXT = "Hello from pytest @ " + datetime.now(timezone.utc).isoformat( - timespec="seconds" -) -TEST_TEXT_UPDATED = TEST_TEXT + " (edited)" - - -@pytest.fixture(scope="module", autouse=True) -def ensure_clean_slate(): - """Delete the test blob (if it exists) before/after running this module.""" - try: - blobs.delete_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID) - except Exception: - pass - yield - try: - blobs.delete_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID) - except Exception: - pass - - -@pytest.fixture(autouse=True) -def init_session(request): - print("Initializing CWMS API session for blob tests...") - - -def _find_blob_row(office: str, blob_id: str) -> Optional[pd.Series]: - """ - Helper: return the row for blob_id from cwms.get_blobs(...).df if present. - """ - res = blobs.get_blobs(office_id=office, blob_id_like=blob_id) - df = res if isinstance(res, pd.DataFrame) else getattr(res, "df", None) - if df is None or df.empty: - return None - # normalize id column name if needed id or blob-id - if "id" not in df.columns and "blob-id" in df.columns: - df = df.rename(columns={"blob-id": "id"}) - match = df[df["id"].str.upper() == blob_id.upper()] - return match.iloc[0] if not match.empty else None - - -def test_store_blob_excel(): - # Create an empty file with the excel extension - excel_file_path = Path(__file__).parent.parent / "resources" / "blob_test.xlsx" - with open(excel_file_path, "rb") as f: - file_data = f.read() - - # Get the file extension and decide which type to use if xlsx or xlx - ext = excel_file_path.suffix.lower() - mime_type = mimetypes.guess_type(excel_file_path.name)[0] - # Some linux systems may not have the excel mimetypes registered - if ext == ".xlsx": - mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" - elif ext == ".xls": - mime_type = "application/vnd.ms-excel" - - excel_blob_id = "TEST_BLOB_EXCEL" - payload = { - "office-id": TEST_OFFICE, - "id": excel_blob_id, - "description": "testing excel file", - "media-type-id": mime_type, - "value": base64.b64encode(file_data).decode("utf-8"), - } - blobs.store_blobs(data=payload) - row = _find_blob_row(TEST_OFFICE, excel_blob_id) - assert row is not None, "Stored blob not found in listing" - - -def test_get_excel_blob(): - # Retrieve the excel blob stored in the previous test - excel_blob_id = "TEST_BLOB_EXCEL" - content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=excel_blob_id) - assert content is not None, "Failed to retrieve excel blob" - assert len(content) > 0, "Excel blob content is empty" - - -def test_store_blob(): - # Build request JSON for store_blobs - payload = { - "office-id": TEST_OFFICE, - "id": TEST_BLOB_ID, - "description": TEST_DESC, - "media-type-id": TEST_MEDIA_TYPE, - "value": TEST_TEXT, - } - blobs.store_blobs(payload, fail_if_exists=True) - - # Verify via listing metadata - row = _find_blob_row(TEST_OFFICE, TEST_BLOB_ID) - assert row is not None, "Stored blob not found in listing" - assert str(row["id"]).upper() == TEST_BLOB_ID - if "media-type-id" in row.index: - assert row["media-type-id"] == TEST_MEDIA_TYPE - if "description" in row.index: - assert TEST_DESC in str(row["description"]) - - # Verify content by downloading - content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID) - assert isinstance(content, str) and content, "Empty blob content" - assert TEST_TEXT in content - - -def test_get_blob(): - # Do a simple read of the blob created in test_store_blob - content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID) - assert TEST_TEXT in content - assert len(content) >= len(TEST_TEXT) - - -def test_update_blob(): - # Test updating all fields - update = { - "office-id": TEST_OFFICE, - "id": TEST_BLOB_UPDATED_ID, - "description": TEST_DESC_UPDATED, - "media-type-id": TEST_MEDIA_TYPE, - "value": TEST_TEXT_UPDATED, - } - blobs.update_blob(update, fail_if_not_exists=True) - - # Confirm updated metadata - row = _find_blob_row(TEST_OFFICE, TEST_BLOB_UPDATED_ID) - assert row is not None, "Updated blob not found" - if "description" in row.index: - assert TEST_DESC_UPDATED in str(row["description"]) - - # Verify new content - content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_UPDATED_ID) - assert TEST_TEXT_UPDATED in content - - -def test_delete_blobs(): - # Delete the test blob - blobs.delete_blob(office_id=TEST_OFFICE, blob_id=TEST_BLOB_ID) - blobs.delete_blob(office_id=TEST_OFFICE, blob_id="TEST_BLOB_EXCEL") - - # Confirm deletion via listing - row = _find_blob_row(TEST_OFFICE, TEST_BLOB_ID) - assert row is None, "Blob still found after deletion" - - row = _find_blob_row(TEST_OFFICE, "TEST_BLOB_EXCEL") - assert row is None, "Excel blob still found after deletion" diff --git a/tests/cda/blobs/test_blobs_binary.py b/tests/cda/blobs/test_blobs_binary.py new file mode 100644 index 00000000..ea7f071e --- /dev/null +++ b/tests/cda/blobs/test_blobs_binary.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import base64 +import mimetypes +from pathlib import Path + +import pytest + +import cwms.catalog.blobs as blobs + +TEST_OFFICE = "MVP" +EXCEL_BLOB_ID = "PYTEST_BLOB_EXCEL" + + +def _excel_mime_type(path: Path) -> str: + ext = path.suffix.lower() + guessed = mimetypes.guess_type(path.name)[0] + if ext == ".xlsx": + return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + if ext == ".xls": + return "application/vnd.ms-excel" + return guessed or "application/octet-stream" + + +@pytest.fixture +def excel_payload() -> dict: + excel_file_path = Path(__file__).parent / ".." / "resources" / "blob_test.xlsx" + with open(excel_file_path, "rb") as f: + file_data = f.read() + + return { + "office-id": TEST_OFFICE, + "id": EXCEL_BLOB_ID, + "description": "pytest excel blob", + "media-type-id": _excel_mime_type(excel_file_path), + "value": base64.b64encode(file_data).decode("utf-8"), + } + + +@pytest.fixture +def stored_excel_blob(excel_payload): + # ensure clean start + try: + blobs.delete_blob(office_id=TEST_OFFICE, blob_id=EXCEL_BLOB_ID) + except Exception: + pass + + blobs.store_blobs(data=excel_payload, fail_if_exists=False) + yield + + # always cleanup + try: + blobs.delete_blob(office_id=TEST_OFFICE, blob_id=EXCEL_BLOB_ID) + except Exception: + pass + + +def test_store_blob_excel_creates_blob(stored_excel_blob): + # If store_blobs didn't throw, we consider it created. + # If you want a stronger assertion, use your shared find_row helper from Option A. + assert True + + +def test_get_excel_blob_returns_content(stored_excel_blob): + content = blobs.get_blob(office_id=TEST_OFFICE, blob_id=EXCEL_BLOB_ID) + assert content is not None + assert len(content) > 0 diff --git a/tests/cda/catalog/catalog_resources.py b/tests/cda/catalog/catalog_resources.py new file mode 100644 index 00000000..d5e6a2d8 --- /dev/null +++ b/tests/cda/catalog/catalog_resources.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Callable, Optional + +import pandas as pd + + +@dataclass(frozen=True) +class CatalogResource: + name: str + id_key: str # "blob_id" or "clob_id" for delete/get args + id_col_fallback: str # "blob-id" or "clob-id" in listing df + list_fn: Callable[..., Any] # get_blobs/get_clobs + store_fn: Callable[..., Any] + get_fn: Callable[..., Any] # get_blob/get_clob + update_fn: Callable[..., Any] + delete_fn: Callable[..., Any] + extract_content: Callable[[Any], str] # how to get string content from get() + + +def df_from_result(res: Any) -> Optional[pd.DataFrame]: + if isinstance(res, pd.DataFrame): + return res + return getattr(res, "df", None) + + +def find_row(resource: CatalogResource, office: str, item_id: str) -> Optional[pd.Series]: + res = resource.list_fn(office_id=office, **{f"{resource.id_key}_like": item_id}) + df = df_from_result(res) + if df is None or df.empty: + return None + if "id" not in df.columns and resource.id_col_fallback in df.columns: + df = df.rename(columns={resource.id_col_fallback: "id"}) + match = df[df["id"].astype(str).str.upper() == item_id.upper()] + return match.iloc[0] if not match.empty else None diff --git a/tests/cda/catalog/lob_CDA_test.py b/tests/cda/catalog/lob_CDA_test.py new file mode 100644 index 00000000..c850ca80 --- /dev/null +++ b/tests/cda/catalog/lob_CDA_test.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +import cwms.catalog.blobs as blobs +import cwms.catalog.clobs as clobs + +from catalog_resources import CatalogResource, find_row + + +def _blob_content(x) -> str: + # blobs.get_blob returns a string in your current test + return x + + +def _clob_content(x) -> str: + # clobs.get_clob returns Data with json["value"] + return x.json["value"] + + +RESOURCES = [ + CatalogResource( + name="blob", + id_key="blob_id", + id_col_fallback="blob-id", + list_fn=blobs.get_blobs, + store_fn=blobs.store_blobs, + get_fn=blobs.get_blob, + update_fn=blobs.update_blob, + delete_fn=blobs.delete_blob, + extract_content=_blob_content, + ), + CatalogResource( + name="clob", + id_key="clob_id", + id_col_fallback="clob-id", + list_fn=clobs.get_clobs, + store_fn=clobs.store_clobs, + get_fn=clobs.get_clob, + update_fn=clobs.update_clob, + delete_fn=clobs.delete_clob, + extract_content=_clob_content, + ), +] + + +@pytest.fixture(params=RESOURCES, ids=[r.name for r in RESOURCES]) +def resource(request) -> CatalogResource: + return request.param + + +@pytest.fixture +def test_constants(resource: CatalogResource): + office = "MVP" + item_id = f"/PYTEST/{resource.name.upper()}/ALPHA" + desc = f"pytest {resource.name} ? initial" + desc_updated = f"pytest {resource.name} ? updated" + text = "Hello from pytest @ " + datetime.now(timezone.utc).isoformat( + timespec="seconds" + ) + text_updated = text + " (edited)" + return office, item_id, desc, desc_updated, text, text_updated + + +@pytest.fixture(autouse=True) +def ensure_clean_slate(resource: CatalogResource, test_constants): + office, item_id, *_ = test_constants + try: + resource.delete_fn(office_id=office, **{resource.id_key: item_id}) + except Exception: + pass + yield + try: + resource.delete_fn(office_id=office, **{resource.id_key: item_id}) + except Exception: + pass + + +def test_store(resource: CatalogResource, test_constants): + office, item_id, desc, _, text, _ = test_constants + + payload = {"office-id": office, "id": item_id, "description": desc, "value": text} + # blob needs media-type-id; clob does not + if resource.name == "blob": + payload["media-type-id"] = "text/plain" + + resource.store_fn(payload, fail_if_exists=True) + + row = find_row(resource, office, item_id) + assert row is not None, f"Stored {resource.name} not found in listing" + assert str(row["id"]).upper() == item_id.upper() + if "description" in row.index: + assert desc in str(row["description"]) + + got = resource.get_fn(office_id=office, **{resource.id_key: item_id}) + content = resource.extract_content(got) + assert isinstance(content, str) and content + assert text in content + + +def test_get(resource: CatalogResource, test_constants): + office, item_id, desc, _, text, _ = test_constants + + # Ensure it exists (keeps tests order-independent) + payload = {"office-id": office, "id": item_id, "description": desc, "value": text} + if resource.name == "blob": + payload["media-type-id"] = "text/plain" + resource.store_fn(payload, fail_if_exists=False) + + got = resource.get_fn(office_id=office, **{resource.id_key: item_id}) + content = resource.extract_content(got) + assert text in content + assert len(content) >= len(text) + + +def test_update(resource: CatalogResource, test_constants): + office, item_id, desc, desc_updated, text, text_updated = test_constants + + # Ensure it exists + payload = {"office-id": office, "id": item_id, "description": desc, "value": text} + if resource.name == "blob": + payload["media-type-id"] = "text/plain" + resource.store_fn(payload, fail_if_exists=False) + + update = { + "office-id": office, + "id": item_id, + "description": desc_updated, + "value": text_updated, + } + + # your current APIs differ slightly here + if resource.name == "blob": + update["media-type-id"] = "text/plain" + resource.update_fn(update, fail_if_not_exists=True) + else: + resource.update_fn(update, ignore_nulls=True) + + row = find_row(resource, office, item_id) + assert row is not None, f"Updated {resource.name} not found" + if "description" in row.index: + assert desc_updated in str(row["description"]) + + got = resource.get_fn(office_id=office, **{resource.id_key: item_id}) + content = resource.extract_content(got) + assert text_updated in content