From 703f57e25632122e793e0964955ccdcb21dcce14 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 23 Feb 2026 15:15:42 -0800 Subject: [PATCH 1/6] Fixes custom schema processing --- .../transact/database/NotificationsDAO.java | 14 +- .../dev/dbos/transact/database/QueuesDAO.java | 12 +- .../dev/dbos/transact/database/StepsDAO.java | 8 +- .../transact/database/SystemDatabase.java | 38 ++-- .../dbos/transact/database/WorkflowDAO.java | 44 ++--- .../transact/migrations/MigrationManager.java | 116 +++++++----- .../transact/invocation/CustomSchemaTest.java | 2 +- .../migrations/MigrationManagerTest.java | 174 +++++++++++++++++- .../java/dev/dbos/transact/utils/DBUtils.java | 12 +- 9 files changed, 309 insertions(+), 111 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java b/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java index 290454e8..719bc1ed 100644 --- a/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java @@ -74,7 +74,7 @@ void send( // Insert notification final String sql = """ - INSERT INTO %s.notifications (destination_uuid, topic, message) VALUES (?, ?, ?) + INSERT INTO "%s".notifications (destination_uuid, topic, message) VALUES (?, ?, ?) """ .formatted(this.schema); @@ -162,7 +162,7 @@ Object recv( try (Connection conn = dataSource.getConnection()) { final String sql = """ - SELECT topic FROM %s.notifications WHERE destination_uuid = ? AND topic = ? + SELECT topic FROM "%s".notifications WHERE destination_uuid = ? AND topic = ? """ .formatted(this.schema); @@ -214,12 +214,12 @@ Object recv( """ WITH oldest_entry AS ( SELECT destination_uuid, topic, message, created_at_epoch_ms - FROM %1$s.notifications + FROM "%1$s".notifications WHERE destination_uuid = ? AND topic = ? ORDER BY created_at_epoch_ms ASC LIMIT 1 ) - DELETE FROM %1$s.notifications + DELETE FROM "%1$s".notifications WHERE destination_uuid = (SELECT destination_uuid FROM oldest_entry) AND topic = (SELECT topic FROM oldest_entry) AND created_at_epoch_ms = (SELECT created_at_epoch_ms FROM oldest_entry) @@ -263,7 +263,7 @@ private void setEvent( throws SQLException { final String eventSql = """ - INSERT INTO %s.workflow_events (workflow_uuid, key, value) + INSERT INTO "%s".workflow_events (workflow_uuid, key, value) VALUES (?, ?, ?) ON CONFLICT (workflow_uuid, key) DO UPDATE SET value = EXCLUDED.value @@ -279,7 +279,7 @@ ON CONFLICT (workflow_uuid, key) final String eventHistorySql = """ - INSERT INTO %s.workflow_events_history (workflow_uuid, function_id, key, value) + INSERT INTO "%s".workflow_events_history (workflow_uuid, function_id, key, value) VALUES (?, ?, ?, ?) ON CONFLICT (workflow_uuid, key, function_id) DO UPDATE SET value = EXCLUDED.value @@ -382,7 +382,7 @@ Object getEvent( Object value = null; final String sql = """ - SELECT value FROM %s.workflow_events WHERE workflow_uuid = ? AND key = ? + SELECT value FROM "%s".workflow_events WHERE workflow_uuid = ? AND key = ? """ .formatted(this.schema); diff --git a/transact/src/main/java/dev/dbos/transact/database/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/QueuesDAO.java index cb0298bb..039958aa 100644 --- a/transact/src/main/java/dev/dbos/transact/database/QueuesDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/QueuesDAO.java @@ -62,7 +62,7 @@ List getAndStartQueuedWorkflows( var limiterQuery = """ SELECT COUNT(*) - FROM %s.workflow_status + FROM "%s".workflow_status WHERE queue_name = ? AND status != ? AND started_at_epoch_ms > ? @@ -100,7 +100,7 @@ SELECT COUNT(*) String pendingQuery = """ SELECT executor_id, COUNT(*) as task_count - FROM %s.workflow_status + FROM "%s".workflow_status WHERE queue_name = ? AND status = ? """ .formatted(this.schema); @@ -170,7 +170,7 @@ SELECT executor_id, COUNT(*) as task_count var query = """ SELECT workflow_uuid - FROM %s.workflow_status + FROM "%s".workflow_status WHERE queue_name = ? AND status = ? AND (application_version = ? OR application_version IS NULL) @@ -226,7 +226,7 @@ SELECT executor_id, COUNT(*) as task_count List updatedWorkflowIds = new ArrayList<>(); String updateQuery = """ - UPDATE %s.workflow_status + UPDATE "%s".workflow_status SET status = ?, application_version = ?, executor_id = ?, @@ -273,7 +273,7 @@ boolean clearQueueAssignment(String workflowId) throws SQLException { final String sql = """ - UPDATE %s.workflow_status + UPDATE "%s".workflow_status SET started_at_epoch_ms = NULL, status = ? WHERE workflow_uuid = ? AND queue_name IS NOT NULL AND status = ? """ @@ -294,7 +294,7 @@ List getQueuePartitions(String queueName) throws SQLException { final String sql = """ SELECT DISTINCT queue_partition_key - FROM %s.workflow_status + FROM "%s".workflow_status WHERE queue_name = ? AND status = ? AND queue_partition_key IS NOT NULL diff --git a/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java b/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java index b67ebaeb..de475291 100644 --- a/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java @@ -55,7 +55,7 @@ static void recordStepResultTxn( Objects.requireNonNull(schema); String sql = """ - INSERT INTO %s.operation_outputs + INSERT INTO "%s".operation_outputs (workflow_uuid, function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING RETURNING completed_at_epoch_ms @@ -132,7 +132,7 @@ static StepResult checkStepExecutionTxn( Objects.requireNonNull(schema); final String sql = """ - SELECT status FROM %s.workflow_status WHERE workflow_uuid = ? + SELECT status FROM "%s".workflow_status WHERE workflow_uuid = ? """ .formatted(schema); @@ -158,7 +158,7 @@ static StepResult checkStepExecutionTxn( String operationOutputSql = """ SELECT output, error, function_name - FROM %s.operation_outputs + FROM "%s".operation_outputs WHERE workflow_uuid = ? AND function_id = ? """ .formatted(schema); @@ -203,7 +203,7 @@ List listWorkflowSteps(Connection connection, String workflowId) throw final String sql = """ SELECT function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms - FROM %s.operation_outputs + FROM "%s".operation_outputs WHERE workflow_uuid = ? ORDER BY function_id; """ diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index f9d4f1c8..67cb14cc 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -37,11 +37,9 @@ public class SystemDatabase implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(SystemDatabase.class); public static String sanitizeSchema(String schema) { - schema = - Objects.requireNonNullElse(schema, Constants.DB_SCHEMA) - .replace("\0", "") - .replace("\"", "\"\""); - return "\"%s\"".formatted(schema); + return Objects.requireNonNullElse(schema, Constants.DB_SCHEMA) + .replace("\0", "") + .replace("\"", "\"\""); } private final DataSource dataSource; @@ -425,7 +423,7 @@ public Optional getExternalState(String service, String workflowN () -> { final String sql = """ - SELECT value, update_seq, update_time FROM %s.event_dispatch_kv WHERE service_name = ? AND workflow_fn_name = ? AND key = ? + SELECT value, update_seq, update_time FROM "%s".event_dispatch_kv WHERE service_name = ? AND workflow_fn_name = ? AND key = ? """ .formatted(this.schema); @@ -456,7 +454,7 @@ public ExternalState upsertExternalState(ExternalState state) { () -> { final var sql = """ - INSERT INTO %s.event_dispatch_kv ( + INSERT INTO "%s".event_dispatch_kv ( service_name, workflow_fn_name, key, value, update_time, update_seq) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT (service_name, workflow_fn_name, key) @@ -509,7 +507,7 @@ public List getMetrics(Instant startTime, Instant endTime) { final var wfSQL = """ SELECT name, COUNT(workflow_uuid) as count - FROM %s.workflow_status + FROM "%s".workflow_status WHERE created_at >= ? AND created_at < ? GROUP BY name """ @@ -517,7 +515,7 @@ SELECT name, COUNT(workflow_uuid) as count final var stepSQL = """ SELECT function_name, COUNT(*) as count - FROM %s.operation_outputs + FROM "%s".operation_outputs WHERE completed_at_epoch_ms >= ? AND completed_at_epoch_ms < ? GROUP BY function_name """ @@ -559,7 +557,7 @@ private String getCheckpointName(Connection conn, String workflowId, int functio var sql = """ SELECT function_name - FROM %s.operation_outputs + FROM "%s".operation_outputs WHERE workflow_uuid = ? AND function_id = ? """ .formatted(this.schema); @@ -613,7 +611,7 @@ public void deleteWorkflows(String... workflowIds) { var sql = """ - DELETE FROM %s.workflow_status + DELETE FROM "%s".workflow_status WHERE workflow_uuid = ANY(?); """ .formatted(this.schema); @@ -642,7 +640,7 @@ List getWorkflowChildrenInternal(String workflowId) throws SQLException var sql = """ SELECT child_workflow_id - FROM %s.operation_outputs + FROM "%s".operation_outputs WHERE workflow_uuid = ? AND child_workflow_id IS NOT NULL """ .formatted(this.schema); @@ -673,7 +671,7 @@ List listWorkflowEvents(Connection conn, String workflowId) throw var sql = """ SELECT key, value - FROM %s.workflow_events + FROM "%s".workflow_events WHERE workflow_uuid = ? """ .formatted(this.schema); @@ -697,7 +695,7 @@ List listWorkflowEventHistory(Connection conn, String work var sql = """ SELECT key, value, function_id - FROM %s.workflow_events_history + FROM "%s".workflow_events_history WHERE workflow_uuid = ? """ .formatted(this.schema); @@ -721,7 +719,7 @@ List listWorkflowStreams(Connection conn, String workflowId) thr var sql = """ SELECT key, value, "offset", function_id - FROM %s.streams + FROM "%s".streams WHERE workflow_uuid = ? """ .formatted(this.schema); @@ -771,7 +769,7 @@ public List exportWorkflow(String workflowId, boolean exportCh public void importWorkflow(List workflows) { var wfSQL = """ - INSERT INTO %s.workflow_status ( + INSERT INTO "%s".workflow_status ( workflow_uuid, status, name, class_name, config_name, authenticated_user, assumed_role, authenticated_roles, @@ -789,7 +787,7 @@ public void importWorkflow(List workflows) { var stepSQL = """ - INSERT INTO %s.operation_outputs ( + INSERT INTO "%s".operation_outputs ( workflow_uuid, function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms @@ -801,7 +799,7 @@ public void importWorkflow(List workflows) { var eventSQL = """ - INSERT INTO %s.workflow_events ( + INSERT INTO "%s".workflow_events ( workflow_uuid, key, value ) VALUES ( ?, ?, ? @@ -811,7 +809,7 @@ public void importWorkflow(List workflows) { var eventHistorySQL = """ - INSERT INTO %s.workflow_events_history ( + INSERT INTO "%s".workflow_events_history ( workflow_uuid, key, value, function_id ) VALUES ( ?, ?, ?, ? @@ -821,7 +819,7 @@ public void importWorkflow(List workflows) { var streamsSQL = """ - INSERT INTO %s.streams ( + INSERT INTO "%s".streams ( workflow_uuid, key, value, function_id, offset ) VALUES ( ?, ?, ?, ?, ? diff --git a/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java b/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java index 80bbe26d..a8697f16 100644 --- a/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java @@ -102,7 +102,7 @@ WorkflowInitResult initWorkflowStatus( var sql = """ - UPDATE %s.workflow_status + UPDATE "%s".workflow_status SET status = ?, deduplication_id = NULL, started_at_epoch_ms = NULL, queue_name = NULL WHERE workflow_uuid = ? AND status = ? """ @@ -162,7 +162,7 @@ InsertWorkflowResult insertWorkflowStatus( String insertSQL = """ - INSERT INTO %s.workflow_status ( + INSERT INTO "%s".workflow_status ( workflow_uuid, status, inputs, name, class_name, config_name, queue_name, deduplication_id, priority, queue_partition_key, @@ -269,7 +269,7 @@ void updateWorkflowOutcome( // Note that transitions from CANCELLED to SUCCESS or ERROR are forbidden var sql = """ - UPDATE %s.workflow_status + UPDATE "%s".workflow_status SET status = ?, output = ?, error = ?, updated_at = ?, deduplication_id = NULL WHERE workflow_uuid = ? AND NOT (status = ? AND ? in (?, ?)) """ @@ -336,7 +336,7 @@ WorkflowStatus getWorkflowStatus(Connection conn, String workflowId) throws SQLE created_at, updated_at, recovery_attempts, started_at_epoch_ms, workflow_timeout_ms, workflow_deadline_epoch_ms, forked_from, parent_workflow_id - FROM %s.workflow_status + FROM "%s".workflow_status WHERE workflow_uuid = ? """ .formatted(this.schema); @@ -388,7 +388,7 @@ List listWorkflows(ListWorkflowsInput input) throws SQLException sqlBuilder.append(", output, error"); } - sqlBuilder.append(" FROM %s.workflow_status ".formatted(this.schema)); + sqlBuilder.append(" FROM \"%s\".workflow_status ".formatted(this.schema)); // --- WHERE Clauses --- StringJoiner whereConditions = new StringJoiner(" AND "); @@ -562,7 +562,7 @@ List getPendingWorkflows(String executorId, String ap final String sql = """ SELECT workflow_uuid, queue_name - FROM %s.workflow_status + FROM "%s".workflow_status WHERE status = ? AND executor_id = ? AND application_version = ? @@ -596,7 +596,7 @@ Result awaitWorkflowResult(String workflowId) throws SQLException { final String sql = """ SELECT status, output, error - FROM %s.workflow_status + FROM "%s".workflow_status WHERE workflow_uuid = ? """ .formatted(this.schema); @@ -660,7 +660,7 @@ Optional checkChildWorkflow(String workflowUuid, int functionId) throws final String sql = """ - SELECT child_workflow_id FROM %s.operation_outputs WHERE workflow_uuid = ? AND function_id = ? + SELECT child_workflow_id FROM "%s".operation_outputs WHERE workflow_uuid = ? AND function_id = ? """ .formatted(this.schema); @@ -687,7 +687,7 @@ void cancelWorkflow(String workflowId) throws SQLException { // Check the status of the workflow. If it is complete, do nothing. String checkStatusSql = """ - SELECT status FROM %s.workflow_status WHERE workflow_uuid = ? + SELECT status FROM "%s".workflow_status WHERE workflow_uuid = ? """ .formatted(this.schema); @@ -713,7 +713,7 @@ void cancelWorkflow(String workflowId) throws SQLException { // on String updateSql = """ - UPDATE %s.workflow_status + UPDATE "%s".workflow_status SET status = ?, queue_name = NULL, deduplication_id = NULL, @@ -833,7 +833,7 @@ private static void insertForkedWorkflowStatus( String sql = """ - INSERT INTO %s.workflow_status ( + INSERT INTO "%s".workflow_status ( workflow_uuid, status, name, class_name, config_name, application_version, application_id, authenticated_user, authenticated_roles, assumed_role, queue_name, inputs, workflow_timeout_ms, forked_from ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) @@ -870,10 +870,10 @@ private static void copyOperationOutputs( String stepOutputsSql = """ - INSERT INTO %1$s.operation_outputs + INSERT INTO "%1$s".operation_outputs (workflow_uuid, function_id, output, error, function_name, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms) SELECT ? as workflow_uuid, function_id, output, error, function_name, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms - FROM %1$s.operation_outputs + FROM "%1$s".operation_outputs WHERE workflow_uuid = ? AND function_id < ? """ .formatted(schema); @@ -888,10 +888,10 @@ private static void copyOperationOutputs( var eventHistorySql = """ - INSERT INTO %1$s.workflow_events_history + INSERT INTO "%1$s".workflow_events_history (workflow_uuid, function_id, key, value) SELECT ? as workflow_uuid, function_id, key, value - FROM %1$s.workflow_events_history + FROM "%1$s".workflow_events_history WHERE workflow_uuid = ? AND function_id < ? """ .formatted(schema); @@ -906,14 +906,14 @@ private static void copyOperationOutputs( var eventSql = """ - INSERT INTO %1$s.workflow_events + INSERT INTO "%1$s".workflow_events (workflow_uuid, key, value) SELECT ?, weh1.key, weh1.value - FROM %1$s.workflow_events_history weh1 + FROM "%1$s".workflow_events_history weh1 WHERE weh1.workflow_uuid = ? AND weh1.function_id = ( SELECT MAX(weh2.function_id) - FROM %1$s.workflow_events_history weh2 + FROM "%1$s".workflow_events_history weh2 WHERE weh2.workflow_uuid = ? AND weh2.key = weh1.key AND weh2.function_id < ? @@ -936,7 +936,7 @@ private static String getWorkflowStatus(Connection connection, String workflowId throws SQLException { String sql = """ - SELECT status FROM %s.workflow_status WHERE workflow_uuid = ? + SELECT status FROM "%s".workflow_status WHERE workflow_uuid = ? """ .formatted(schema); @@ -956,7 +956,7 @@ private static void updateWorkflowToEnqueued( Connection connection, String workflowId, String schema) throws SQLException { String sql = """ - UPDATE %s.workflow_status + UPDATE "%s".workflow_status SET status = ?, queue_name = ?, recovery_attempts = ?, workflow_deadline_epoch_ms = 0, deduplication_id = NULL, started_at_epoch_ms = NULL WHERE workflow_uuid = ? """ @@ -976,7 +976,7 @@ private static Long getRowsCutoff(Connection connection, long rowsThreshold, Str throws SQLException { String sql = """ - SELECT created_at FROM %s.workflow_status ORDER BY created_at DESC OFFSET ? LIMIT 1 + SELECT created_at FROM "%s".workflow_status ORDER BY created_at DESC OFFSET ? LIMIT 1 """ .formatted(schema); try (PreparedStatement stmt = connection.prepareStatement(sql)) { @@ -1006,7 +1006,7 @@ void garbageCollect(Long cutoffEpochTimestampMs, Long rowsThreshold) throws SQLE if (cutoffEpochTimestampMs != null) { String sql = """ - DELETE FROM %s.workflow_status WHERE created_at < ? AND status NOT IN (?, ?) + DELETE FROM "%s".workflow_status WHERE created_at < ? AND status NOT IN (?, ?) """ .formatted(this.schema); try (PreparedStatement stmt = connection.prepareStatement(sql)) { diff --git a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java index 4abb6bca..ae595526 100644 --- a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java +++ b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java @@ -132,7 +132,7 @@ public static void ensureDbosSchema(Connection conn, String schema) { } try (var stmt = conn.createStatement()) { - stmt.execute("CREATE SCHEMA IF NOT EXISTS %s".formatted(schema)); + stmt.execute("CREATE SCHEMA IF NOT EXISTS \"%s\"".formatted(schema)); } catch (SQLException e) { logger.warn("SQLException thrown creating the {} schema", schema, e); } @@ -155,7 +155,7 @@ public static void ensureMigrationTable(Connection conn, String schema) { try (var stmt = conn.createStatement()) { stmt.execute( - "CREATE TABLE IF NOT EXISTS %s.dbos_migrations (version BIGINT NOT NULL PRIMARY KEY)" + "CREATE TABLE IF NOT EXISTS \"%s\".dbos_migrations (version BIGINT NOT NULL PRIMARY KEY)" .formatted(schema)); } catch (SQLException e) { logger.warn("SQLException thrown creating the dbos_migrations table", e); @@ -165,7 +165,7 @@ public static void ensureMigrationTable(Connection conn, String schema) { public static int getCurrentSysDbVersion(Connection conn, String schema) { Objects.requireNonNull(schema, "schema must not be null"); var sql = - "SELECT version FROM %s.dbos_migrations ORDER BY version DESC limit 1".formatted(schema); + "SELECT version FROM \"%s\".dbos_migrations ORDER BY version DESC limit 1".formatted(schema); try (var stmt = conn.createStatement(); var rs = stmt.executeQuery(sql)) { if (rs.next()) { @@ -204,14 +204,14 @@ static void runDbosMigrations(Connection conn, String schema, List migra try { int rowCount = 0; - var updateSQL = "UPDATE %s.dbos_migrations SET version = ?".formatted(schema); + var updateSQL = "UPDATE \"%s\".dbos_migrations SET version = ?".formatted(schema); try (var stmt = conn.prepareStatement(updateSQL)) { stmt.setLong(1, migrationIndex); rowCount = stmt.executeUpdate(); } if (rowCount == 0) { - var insertSql = "INSERT INTO %s.dbos_migrations (version) VALUES (?)".formatted(schema); + var insertSql = "INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema); try (var stmt = conn.prepareStatement(insertSql)) { stmt.setLong(1, migrationIndex); stmt.executeUpdate(); @@ -236,7 +236,9 @@ public static List getMigrations(String schema) { migration5, migration6, migration7, - migration8); + migration8, + migration9, + migration10); return migrations.stream().map(m -> m.formatted(schema)).toList(); } @@ -244,7 +246,7 @@ public static List getMigrations(String schema) { """ CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; - CREATE TABLE %1$s.workflow_status ( + CREATE TABLE "%1$s".workflow_status ( workflow_uuid TEXT PRIMARY KEY, status TEXT, name TEXT, @@ -268,42 +270,42 @@ config_name VARCHAR(255) DEFAULT NULL, inputs TEXT, started_at_epoch_ms BIGINT, deduplication_id TEXT, - priority INTEGER NOT NULL DEFAULT 0 + priority INT4 NOT NULL DEFAULT 0 ); - CREATE INDEX workflow_status_created_at_index ON %1$s.workflow_status (created_at); - CREATE INDEX workflow_status_executor_id_index ON %1$s.workflow_status (executor_id); - CREATE INDEX workflow_status_status_index ON %1$s.workflow_status (status); + CREATE INDEX workflow_status_created_at_index ON "%1$s".workflow_status (created_at); + CREATE INDEX workflow_status_executor_id_index ON "%1$s".workflow_status (executor_id); + CREATE INDEX workflow_status_status_index ON "%1$s".workflow_status (status); - ALTER TABLE %1$s.workflow_status + ALTER TABLE "%1$s".workflow_status ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id UNIQUE (queue_name, deduplication_id); - CREATE TABLE %1$s.operation_outputs ( + CREATE TABLE "%1$s".operation_outputs ( workflow_uuid TEXT NOT NULL, - function_id INTEGER NOT NULL, + function_id INT4 NOT NULL, function_name TEXT NOT NULL DEFAULT '', output TEXT, error TEXT, child_workflow_id TEXT, PRIMARY KEY (workflow_uuid, function_id), - FOREIGN KEY (workflow_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) + FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) ON UPDATE CASCADE ON DELETE CASCADE ); - CREATE TABLE %1$s.notifications ( + CREATE TABLE "%1$s".notifications ( + message_uuid TEXT NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY, -- Built-in function destination_uuid TEXT NOT NULL, topic TEXT, message TEXT NOT NULL, created_at_epoch_ms BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint, - message_uuid TEXT NOT NULL DEFAULT gen_random_uuid(), -- Built-in function - FOREIGN KEY (destination_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) + FOREIGN KEY (destination_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) ON UPDATE CASCADE ON DELETE CASCADE ); - CREATE INDEX idx_workflow_topic ON %1$s.notifications (destination_uuid, topic); + CREATE INDEX idx_workflow_topic ON "%1$s".notifications (destination_uuid, topic); -- Create notification function - CREATE OR REPLACE FUNCTION %1$s.notifications_function() RETURNS TRIGGER AS $$ + CREATE OR REPLACE FUNCTION "%1$s".notifications_function() RETURNS TRIGGER AS $$ DECLARE payload text := NEW.destination_uuid || '::' || NEW.topic; BEGIN @@ -314,20 +316,20 @@ FOREIGN KEY (destination_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) -- Create notification trigger CREATE TRIGGER dbos_notifications_trigger - AFTER INSERT ON %1$s.notifications - FOR EACH ROW EXECUTE FUNCTION %1$s.notifications_function(); + AFTER INSERT ON "%1$s".notifications + FOR EACH ROW EXECUTE FUNCTION "%1$s".notifications_function(); - CREATE TABLE %1$s.workflow_events ( + CREATE TABLE "%1$s".workflow_events ( workflow_uuid TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, PRIMARY KEY (workflow_uuid, key), - FOREIGN KEY (workflow_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) + FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) ON UPDATE CASCADE ON DELETE CASCADE ); -- Create events function - CREATE OR REPLACE FUNCTION %1$s.workflow_events_function() RETURNS TRIGGER AS $$ + CREATE OR REPLACE FUNCTION "%1$s".workflow_events_function() RETURNS TRIGGER AS $$ DECLARE payload text := NEW.workflow_uuid || '::' || NEW.key; BEGIN @@ -338,20 +340,20 @@ FOREIGN KEY (workflow_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) -- Create events trigger CREATE TRIGGER dbos_workflow_events_trigger - AFTER INSERT ON %1$s.workflow_events - FOR EACH ROW EXECUTE FUNCTION %1$s.workflow_events_function(); + AFTER INSERT ON "%1$s".workflow_events + FOR EACH ROW EXECUTE FUNCTION "%1$s".workflow_events_function(); - CREATE TABLE %1$s.streams ( + CREATE TABLE "%1$s".streams ( workflow_uuid TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, - "offset" INTEGER NOT NULL, + "offset" INT4 NOT NULL, PRIMARY KEY (workflow_uuid, key, "offset"), - FOREIGN KEY (workflow_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) + FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) ON UPDATE CASCADE ON DELETE CASCADE ); - CREATE TABLE %1$s.event_dispatch_kv ( + CREATE TABLE "%1$s".event_dispatch_kv ( service_name TEXT NOT NULL, workflow_fn_name TEXT NOT NULL, key TEXT NOT NULL, @@ -364,47 +366,75 @@ PRIMARY KEY (service_name, workflow_fn_name, key) static final String migration2 = """ - ALTER TABLE %1$s.workflow_status ADD COLUMN queue_partition_key TEXT; + ALTER TABLE "%1$s".workflow_status ADD COLUMN queue_partition_key TEXT; """; static final String migration3 = """ - create index "idx_workflow_status_queue_status_started" on %1$s."workflow_status" ("queue_name", "status", "started_at_epoch_ms") + create index "idx_workflow_status_queue_status_started" on "%1$s"."workflow_status" ("queue_name", "status", "started_at_epoch_ms") """; static final String migration4 = """ - ALTER TABLE %1$s.workflow_status ADD COLUMN forked_from TEXT; - CREATE INDEX "idx_workflow_status_forked_from" ON %1$s."workflow_status" ("forked_from"); + ALTER TABLE "%1$s".workflow_status ADD COLUMN forked_from TEXT; + CREATE INDEX "idx_workflow_status_forked_from" ON "%1$s"."workflow_status" ("forked_from"); """; static final String migration5 = """ - ALTER TABLE %1$s.operation_outputs ADD COLUMN started_at_epoch_ms BIGINT, ADD COLUMN completed_at_epoch_ms BIGINT; + ALTER TABLE "%1$s".operation_outputs ADD COLUMN started_at_epoch_ms BIGINT, ADD COLUMN completed_at_epoch_ms BIGINT; """; static final String migration6 = """ - CREATE TABLE %1$s.workflow_events_history ( + CREATE TABLE "%1$s".workflow_events_history ( workflow_uuid TEXT NOT NULL, - function_id INTEGER NOT NULL, + function_id INT4 NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, PRIMARY KEY (workflow_uuid, function_id, key), - FOREIGN KEY (workflow_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) + FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) ON UPDATE CASCADE ON DELETE CASCADE ); - ALTER TABLE %1$s.streams ADD COLUMN function_id INTEGER NOT NULL DEFAULT 0; + ALTER TABLE "%1$s".streams ADD COLUMN function_id INT4 NOT NULL DEFAULT 0; """; static final String migration7 = """ - ALTER TABLE %1$s."workflow_status" ADD COLUMN "owner_xid" VARCHAR(40) DEFAULT NULL + ALTER TABLE "%1$s"."workflow_status" ADD COLUMN "owner_xid" VARCHAR(40) DEFAULT NULL """; static final String migration8 = """ - ALTER TABLE %1$s."workflow_status" ADD COLUMN "parent_workflow_id" TEXT DEFAULT NULL; - CREATE INDEX "idx_workflow_status_parent_workflow_id" ON %1$s."workflow_status" ("parent_workflow_id"); + ALTER TABLE "%1$s"."workflow_status" ADD COLUMN "parent_workflow_id" TEXT DEFAULT NULL; + CREATE INDEX "idx_workflow_status_parent_workflow_id" ON "%1$s"."workflow_status" ("parent_workflow_id"); + """; + + static final String migration9 = + """ + CREATE TABLE "%1$s".workflow_schedules ( + schedule_id TEXT PRIMARY KEY, + schedule_name TEXT NOT NULL UNIQUE, + workflow_name TEXT NOT NULL, + workflow_class_name TEXT, + schedule TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'ACTIVE', + context TEXT NOT NULL + ); + """; + + static final String migration10 = + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.table_constraints + WHERE table_schema = '%1$s' + AND table_name = 'notifications' + AND constraint_type = 'PRIMARY KEY' + ) THEN + ALTER TABLE "%1$s".notifications ADD PRIMARY KEY (message_uuid); + END IF; + END $$; """; } diff --git a/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java b/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java index 458c9e04..f044ec71 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/CustomSchemaTest.java @@ -24,7 +24,7 @@ @org.junit.jupiter.api.Timeout(value = 2, unit = java.util.concurrent.TimeUnit.MINUTES) public class CustomSchemaTest { private static DBOSConfig dbosConfig; - private static final String schema = "C$+0m'"; + private static final String schema = "F8nny_sCHem@-n@m3"; private HawkService proxy; private HikariDataSource dataSource; private String localDate = LocalDate.now().format(DateTimeFormatter.ISO_DATE); diff --git a/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java b/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java index 3a139f58..b8b6600b 100644 --- a/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java +++ b/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java @@ -82,7 +82,7 @@ public static int getVersion(Connection conn) throws Exception { public static int getVersion(Connection conn, String schema) throws Exception { schema = SystemDatabase.sanitizeSchema(schema); - String sql = "SELECT version FROM %s.dbos_migrations".formatted(schema); + String sql = "SELECT version FROM \"%s\".dbos_migrations".formatted(schema); try (var stmt = conn.createStatement(); var rs = stmt.executeQuery(sql)) { assertTrue(rs.next()); @@ -95,7 +95,7 @@ public static int getVersion(Connection conn, String schema) throws Exception { @Test void testRunMigrations_customSchema() throws Exception { - String schema = "C\"$+0m'"; + var schema = "F8nny_sCHem@-n@m3"; dbosConfig = dbosConfig.withDatabaseSchema(schema); MigrationManager.runMigrations(dbosConfig); @@ -154,4 +154,174 @@ public void extractDbAndPostgresUrl() { assertEquals("dbos_java_sys", pair.database()); assertEquals("jdbc:postgresql://localhost:5432/postgres?user=alice&ssl=true", pair.url()); } + + @Test + void testOriginalMigration1ThenAllMigrations_NotificationsPrimaryKey() throws Exception { + try (Connection conn = dataSource.getConnection()) { + // Ensure schema and migration table exist + MigrationManager.ensureDbosSchema(conn, Constants.DB_SCHEMA); + MigrationManager.ensureMigrationTable(conn, Constants.DB_SCHEMA); + + // Run only the original migration1 (before primary key was added) to populate database with initial structure + var originalMigration1 = getOriginalMigration1().formatted(Constants.DB_SCHEMA); + try (var stmt = conn.createStatement()) { + stmt.execute(originalMigration1); + } + + // Update migration version to 1 + var insertSql = "INSERT INTO \"%s\".dbos_migrations (version) VALUES (1)".formatted(Constants.DB_SCHEMA); + try (var stmt = conn.prepareStatement(insertSql)) { + stmt.executeUpdate(); + } + + // Verify notifications table was created + DatabaseMetaData metaData = conn.getMetaData(); + assertTableExists(metaData, "notifications"); + + // Now run all current migrations (including migration10 which ensures primary key) + var allMigrations = MigrationManager.getMigrations(Constants.DB_SCHEMA); + MigrationManager.runDbosMigrations(conn, Constants.DB_SCHEMA, allMigrations); + + // Verify that the notifications table has a primary key + assertNotificationTableHasPrimaryKey(metaData, "notifications", Constants.DB_SCHEMA); + + // Verify all migrations were applied + var finalVersion = getVersion(conn); + assertEquals(allMigrations.size(), finalVersion); + } + } + + private static void assertNotificationTableHasPrimaryKey(DatabaseMetaData metaData, String tableName, String schemaName) throws Exception { + try (ResultSet rs = metaData.getPrimaryKeys(null, schemaName, tableName)) { + assertTrue(rs.next(), "Table %s should have a primary key in schema %s".formatted(tableName, schemaName)); + assertEquals("message_uuid", rs.getString("COLUMN_NAME"), "Primary key should be on message_uuid column"); + } + } + + /** + * Returns the original migration1 before primary key was added to notifications table. + * This represents the state before migration10 was introduced to defensively add the primary key. + */ + private static String getOriginalMigration1() { + return """ + CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + + CREATE TABLE "%1$s".workflow_status ( + workflow_uuid TEXT PRIMARY KEY, + status TEXT, + name TEXT, + authenticated_user TEXT, + assumed_role TEXT, + authenticated_roles TEXT, + request TEXT, + output TEXT, + error TEXT, + executor_id TEXT, + created_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint, + updated_at BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint, + application_version TEXT, + application_id TEXT, + class_name VARCHAR(255) DEFAULT NULL, + config_name VARCHAR(255) DEFAULT NULL, + recovery_attempts BIGINT DEFAULT 0, + queue_name TEXT, + workflow_timeout_ms BIGINT, + workflow_deadline_epoch_ms BIGINT, + inputs TEXT, + started_at_epoch_ms BIGINT, + deduplication_id TEXT, + priority INT4 NOT NULL DEFAULT 0 + ); + + CREATE INDEX workflow_status_created_at_index ON "%1$s".workflow_status (created_at); + CREATE INDEX workflow_status_executor_id_index ON "%1$s".workflow_status (executor_id); + CREATE INDEX workflow_status_status_index ON "%1$s".workflow_status (status); + + ALTER TABLE "%1$s".workflow_status + ADD CONSTRAINT uq_workflow_status_queue_name_dedup_id + UNIQUE (queue_name, deduplication_id); + + CREATE TABLE "%1$s".operation_outputs ( + workflow_uuid TEXT NOT NULL, + function_id INT4 NOT NULL, + function_name TEXT NOT NULL DEFAULT '', + output TEXT, + error TEXT, + child_workflow_id TEXT, + PRIMARY KEY (workflow_uuid, function_id), + FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) + ON UPDATE CASCADE ON DELETE CASCADE + ); + + CREATE TABLE "%1$s".notifications ( + message_uuid TEXT NOT NULL DEFAULT gen_random_uuid(), + destination_uuid TEXT NOT NULL, + topic TEXT, + message TEXT NOT NULL, + created_at_epoch_ms BIGINT NOT NULL DEFAULT (EXTRACT(epoch FROM now()) * 1000::numeric)::bigint, + FOREIGN KEY (destination_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) + ON UPDATE CASCADE ON DELETE CASCADE + ); + CREATE INDEX idx_workflow_topic ON "%1$s".notifications (destination_uuid, topic); + + -- Create notification function + CREATE OR REPLACE FUNCTION "%1$s".notifications_function() RETURNS TRIGGER AS $$ + DECLARE + payload text := NEW.destination_uuid || '::' || NEW.topic; + BEGIN + PERFORM pg_notify('dbos_notifications_channel', payload); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + -- Create notification trigger + CREATE TRIGGER dbos_notifications_trigger + AFTER INSERT ON "%1$s".notifications + FOR EACH ROW EXECUTE FUNCTION "%1$s".notifications_function(); + + CREATE TABLE "%1$s".workflow_events ( + workflow_uuid TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + PRIMARY KEY (workflow_uuid, key), + FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) + ON UPDATE CASCADE ON DELETE CASCADE + ); + + -- Create events function + CREATE OR REPLACE FUNCTION "%1$s".workflow_events_function() RETURNS TRIGGER AS $$ + DECLARE + payload text := NEW.workflow_uuid || '::' || NEW.key; + BEGIN + PERFORM pg_notify('dbos_workflow_events_channel', payload); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + + -- Create events trigger + CREATE TRIGGER dbos_workflow_events_trigger + AFTER INSERT ON "%1$s".workflow_events + FOR EACH ROW EXECUTE FUNCTION "%1$s".workflow_events_function(); + + CREATE TABLE "%1$s".streams ( + workflow_uuid TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + "offset" INT4 NOT NULL, + PRIMARY KEY (workflow_uuid, key, "offset"), + FOREIGN KEY (workflow_uuid) REFERENCES "%1$s".workflow_status(workflow_uuid) + ON UPDATE CASCADE ON DELETE CASCADE + ); + + CREATE TABLE "%1$s".event_dispatch_kv ( + service_name TEXT NOT NULL, + workflow_fn_name TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT, + update_seq NUMERIC(38,0), + update_time NUMERIC(38,15), + PRIMARY KEY (service_name, workflow_fn_name, key) + ); + """; + } } diff --git a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java index ae9c2983..757bac3c 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java +++ b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java @@ -215,7 +215,7 @@ public static List getWorkflowRows(DataSource ds) throws SQLE public static List getWorkflowRows(DataSource ds, String schema) throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); - String sql = "SELECT * FROM %s.workflow_status ORDER BY created_at".formatted(schema); + String sql = "SELECT * FROM \"%s\".workflow_status ORDER BY created_at".formatted(schema); try (var conn = ds.getConnection(); var stmt = conn.createStatement(); var rs = stmt.executeQuery(sql)) { @@ -235,7 +235,7 @@ public static WorkflowStatusRow getWorkflowRow(DataSource ds, String workflowId) public static WorkflowStatusRow getWorkflowRow(DataSource ds, String workflowId, String schema) throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); - var sql = "SELECT * FROM %s.workflow_status WHERE workflow_uuid = ?".formatted(schema); + var sql = "SELECT * FROM \"%s\".workflow_status WHERE workflow_uuid = ?".formatted(schema); try (var conn = ds.getConnection(); var stmt = conn.prepareStatement(sql)) { stmt.setString(1, workflowId); @@ -258,7 +258,7 @@ public static List getStepRows( DataSource ds, String workflowId, String schema) throws SQLException { schema = SystemDatabase.sanitizeSchema(schema); var sql = - "SELECT * FROM %s.operation_outputs WHERE workflow_uuid = ? ORDER BY function_id" + "SELECT * FROM \"%s\".operation_outputs WHERE workflow_uuid = ? ORDER BY function_id" .formatted(schema); try (var conn = ds.getConnection(); var stmt = conn.prepareStatement(sql)) { @@ -288,7 +288,7 @@ public static List getWorkflowEvents(DataSource ds, String workflowId, St try (var conn = ds.getConnection(); ) { var stmt = conn.prepareStatement( - "SELECT * FROM %s.workflow_events WHERE workflow_uuid = ?".formatted(schema)); + "SELECT * FROM \"%s\".workflow_events WHERE workflow_uuid = ?".formatted(schema)); stmt.setString(1, workflowId); var rs = stmt.executeQuery(); List rows = new ArrayList<>(); @@ -316,7 +316,7 @@ public static List getWorkflowEventHistory( try (var conn = ds.getConnection(); ) { var stmt = conn.prepareStatement( - "SELECT * FROM %s.workflow_events_history WHERE workflow_uuid = ?".formatted(schema)); + "SELECT * FROM \"%s\".workflow_events_history WHERE workflow_uuid = ?".formatted(schema)); stmt.setString(1, workflowId); var rs = stmt.executeQuery(); List rows = new ArrayList<>(); @@ -340,7 +340,7 @@ public static boolean queueEntriesCleanedUp(DataSource ds, String schema) throws schema = SystemDatabase.sanitizeSchema(schema); var sql = """ - SELECT COUNT(*) FROM %s.workflow_status + SELECT COUNT(*) FROM "%s".workflow_status WHERE queue_name IS NOT NULL AND queue_name != ? AND status IN ('ENQUEUED', 'PENDING') From 1c82e225403b2b796545616eff9b413e10ad8ff8 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 23 Feb 2026 15:28:34 -0800 Subject: [PATCH 2/6] spotless --- .../transact/migrations/MigrationManager.java | 6 +++-- .../migrations/MigrationManagerTest.java | 22 +++++++++++++------ .../java/dev/dbos/transact/utils/DBUtils.java | 3 ++- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java index ae595526..57b90109 100644 --- a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java +++ b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java @@ -165,7 +165,8 @@ public static void ensureMigrationTable(Connection conn, String schema) { public static int getCurrentSysDbVersion(Connection conn, String schema) { Objects.requireNonNull(schema, "schema must not be null"); var sql = - "SELECT version FROM \"%s\".dbos_migrations ORDER BY version DESC limit 1".formatted(schema); + "SELECT version FROM \"%s\".dbos_migrations ORDER BY version DESC limit 1" + .formatted(schema); try (var stmt = conn.createStatement(); var rs = stmt.executeQuery(sql)) { if (rs.next()) { @@ -211,7 +212,8 @@ static void runDbosMigrations(Connection conn, String schema, List migra } if (rowCount == 0) { - var insertSql = "INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema); + var insertSql = + "INSERT INTO \"%s\".dbos_migrations (version) VALUES (?)".formatted(schema); try (var stmt = conn.prepareStatement(insertSql)) { stmt.setLong(1, migrationIndex); stmt.executeUpdate(); diff --git a/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java b/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java index b8b6600b..0ea7093c 100644 --- a/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java +++ b/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java @@ -162,14 +162,16 @@ void testOriginalMigration1ThenAllMigrations_NotificationsPrimaryKey() throws Ex MigrationManager.ensureDbosSchema(conn, Constants.DB_SCHEMA); MigrationManager.ensureMigrationTable(conn, Constants.DB_SCHEMA); - // Run only the original migration1 (before primary key was added) to populate database with initial structure + // Run only the original migration1 (before primary key was added) to populate database with + // initial structure var originalMigration1 = getOriginalMigration1().formatted(Constants.DB_SCHEMA); try (var stmt = conn.createStatement()) { stmt.execute(originalMigration1); } // Update migration version to 1 - var insertSql = "INSERT INTO \"%s\".dbos_migrations (version) VALUES (1)".formatted(Constants.DB_SCHEMA); + var insertSql = + "INSERT INTO \"%s\".dbos_migrations (version) VALUES (1)".formatted(Constants.DB_SCHEMA); try (var stmt = conn.prepareStatement(insertSql)) { stmt.executeUpdate(); } @@ -191,16 +193,22 @@ void testOriginalMigration1ThenAllMigrations_NotificationsPrimaryKey() throws Ex } } - private static void assertNotificationTableHasPrimaryKey(DatabaseMetaData metaData, String tableName, String schemaName) throws Exception { + private static void assertNotificationTableHasPrimaryKey( + DatabaseMetaData metaData, String tableName, String schemaName) throws Exception { try (ResultSet rs = metaData.getPrimaryKeys(null, schemaName, tableName)) { - assertTrue(rs.next(), "Table %s should have a primary key in schema %s".formatted(tableName, schemaName)); - assertEquals("message_uuid", rs.getString("COLUMN_NAME"), "Primary key should be on message_uuid column"); + assertTrue( + rs.next(), + "Table %s should have a primary key in schema %s".formatted(tableName, schemaName)); + assertEquals( + "message_uuid", + rs.getString("COLUMN_NAME"), + "Primary key should be on message_uuid column"); } } /** - * Returns the original migration1 before primary key was added to notifications table. - * This represents the state before migration10 was introduced to defensively add the primary key. + * Returns the original migration1 before primary key was added to notifications table. This + * represents the state before migration10 was introduced to defensively add the primary key. */ private static String getOriginalMigration1() { return """ diff --git a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java index 757bac3c..935773bd 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java +++ b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java @@ -316,7 +316,8 @@ public static List getWorkflowEventHistory( try (var conn = ds.getConnection(); ) { var stmt = conn.prepareStatement( - "SELECT * FROM \"%s\".workflow_events_history WHERE workflow_uuid = ?".formatted(schema)); + "SELECT * FROM \"%s\".workflow_events_history WHERE workflow_uuid = ?" + .formatted(schema)); stmt.setString(1, workflowId); var rs = stmt.executeQuery(); List rows = new ArrayList<>(); From b7a81ab3f725135843116e69b3785fc229166e18 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 23 Feb 2026 15:48:16 -0800 Subject: [PATCH 3/6] explicitly block schemas with single or double quote --- .../dbos/transact/database/SystemDatabase.java | 4 +--- .../transact/migrations/MigrationManager.java | 4 ++++ .../migrations/MigrationManagerTest.java | 16 +++++++++++++--- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 67cb14cc..0cc465b4 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -37,9 +37,7 @@ public class SystemDatabase implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(SystemDatabase.class); public static String sanitizeSchema(String schema) { - return Objects.requireNonNullElse(schema, Constants.DB_SCHEMA) - .replace("\0", "") - .replace("\"", "\"\""); + return Objects.requireNonNullElse(schema, Constants.DB_SCHEMA).replace("\0", ""); } private final DataSource dataSource; diff --git a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java index 57b90109..9b5de92e 100644 --- a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java +++ b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java @@ -59,6 +59,10 @@ private static void runMigrations(DataSource ds, String schema) { Objects.requireNonNull(ds, "Data Source must not be null"); schema = SystemDatabase.sanitizeSchema(schema); + if (schema.contains("'") || schema.contains("\"")) { + throw new IllegalArgumentException("Schema name must not contain single or double quotes"); + } + try (var conn = ds.getConnection()) { ensureDbosSchema(conn, schema); ensureMigrationTable(conn, schema); diff --git a/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java b/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java index 0ea7093c..96879186 100644 --- a/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java +++ b/transact/src/test/java/dev/dbos/transact/migrations/MigrationManagerTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import dev.dbos.transact.Constants; @@ -20,6 +21,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; @org.junit.jupiter.api.Timeout(value = 2, unit = java.util.concurrent.TimeUnit.MINUTES) class MigrationManagerTest { @@ -71,6 +74,7 @@ public static void assertTableExists(DatabaseMetaData metaData, String tableName public static void assertTableExists( DatabaseMetaData metaData, String tableName, String schemaName) throws Exception { + schemaName = SystemDatabase.sanitizeSchema(schemaName); try (ResultSet rs = metaData.getTables(null, schemaName, tableName, null)) { assertTrue(rs.next(), "Table %s should exist in schema %s".formatted(tableName, schemaName)); } @@ -92,10 +96,16 @@ public static int getVersion(Connection conn, String schema) throws Exception { } } - @Test - void testRunMigrations_customSchema() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"invalid\"schema", "invalid'schema"}) + void testRunMigrations_fails_invalid_schema(String invalidSchema) throws Exception { + dbosConfig = dbosConfig.withDatabaseSchema(invalidSchema); + assertThrows(IllegalArgumentException.class, () -> MigrationManager.runMigrations(dbosConfig)); + } - var schema = "F8nny_sCHem@-n@m3"; + @ParameterizedTest + @ValueSource(strings = {"F8nny_sCHem@-n@m3", "embedded\0null"}) + void testRunMigrations_customSchema(String schema) throws Exception { dbosConfig = dbosConfig.withDatabaseSchema(schema); MigrationManager.runMigrations(dbosConfig); From 0a30dcac0b39d0b60128a8a31c1c1a76bb48e9e7 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 23 Feb 2026 16:18:51 -0800 Subject: [PATCH 4/6] fix cli tests --- .../dbos/transact/cli/MigrateCommandTest.java | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/transact-cli/src/test/java/dev/dbos/transact/cli/MigrateCommandTest.java b/transact-cli/src/test/java/dev/dbos/transact/cli/MigrateCommandTest.java index 8846b18e..346d80bb 100644 --- a/transact-cli/src/test/java/dev/dbos/transact/cli/MigrateCommandTest.java +++ b/transact-cli/src/test/java/dev/dbos/transact/cli/MigrateCommandTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import dev.dbos.transact.Constants; +import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.migrations.MigrationManager; import java.io.PrintWriter; @@ -17,6 +18,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import picocli.CommandLine; @Timeout(value = 2, unit = TimeUnit.MINUTES) @@ -72,18 +75,32 @@ public void migrate_twice() throws Exception { assertTrue(checkTable(Constants.DB_SCHEMA, "workflow_status")); } - @Test - public void migrate_custom_schema() throws Exception { - + @ParameterizedTest + @ValueSource(strings = {"invalid\"schema", "invalid'schema"}) + void testRunMigrations_fails_invalid_schema(String schema) throws Exception { assertFalse(checkConnection()); - var schema = "C\"$+0m'"; + var cmd = new CommandLine(new DBOSCommand()); + var sw = new StringWriter(); + cmd.setOut(new PrintWriter(sw)); + + var exitCode = + cmd.execute("migrate", "-D=" + db_url, "-U=" + db_user, "--schema", "%s".formatted(schema)); + assertEquals(1, exitCode); + } + + @ParameterizedTest + @ValueSource(strings = {"F8nny_sCHem@-n@m3", "embedded\0null"}) + public void migrate_custom_schema(String schema) throws Exception { + + assertFalse(checkConnection()); var cmd = new CommandLine(new DBOSCommand()); var sw = new StringWriter(); cmd.setOut(new PrintWriter(sw)); - var exitCode = cmd.execute("migrate", "-D=" + db_url, "-U=" + db_user, "--schema=" + schema); + var exitCode = + cmd.execute("migrate", "-D=" + db_url, "-U=" + db_user, "--schema", "%s".formatted(schema)); assertEquals(0, exitCode); assertTrue(checkConnection()); @@ -105,7 +122,7 @@ static boolean checkTable(String schema, String table) throws SQLException { "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = ? AND table_name = ?)"; try (var conn = DriverManager.getConnection(db_url, db_user, db_password); var stmt = conn.prepareStatement(sql)) { - stmt.setString(1, schema); + stmt.setString(1, SystemDatabase.sanitizeSchema(schema)); stmt.setString(2, table); try (var rs = stmt.executeQuery()) { if (rs.next()) { From 92037c9a8d578aabb3451047292949b338ea6801 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 23 Feb 2026 17:11:12 -0800 Subject: [PATCH 5/6] add schema check to system db ctor --- .../main/java/dev/dbos/transact/database/SystemDatabase.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 0cc465b4..9e301795 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -52,6 +52,10 @@ public static String sanitizeSchema(String schema) { private SystemDatabase(DataSource dataSource, String schema, boolean created) { this.schema = sanitizeSchema(schema); + if (schema.contains("'") || schema.contains("\"")) { + throw new IllegalArgumentException("Schema name must not contain single or double quotes"); + } + this.dataSource = dataSource; this.created = created; From 7e6190a85fc5903fb05a8ca41d69b8efdd8581f7 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Mon, 23 Feb 2026 17:27:08 -0800 Subject: [PATCH 6/6] fix sysdb schema check --- .../main/java/dev/dbos/transact/database/SystemDatabase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 9e301795..38c98422 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -51,11 +51,12 @@ public static String sanitizeSchema(String schema) { private final NotificationService notificationService; private SystemDatabase(DataSource dataSource, String schema, boolean created) { - this.schema = sanitizeSchema(schema); + schema = sanitizeSchema(schema); if (schema.contains("'") || schema.contains("\"")) { throw new IllegalArgumentException("Schema name must not contain single or double quotes"); } + this.schema = schema; this.dataSource = dataSource; this.created = created;