From 21a57d2ce7c4f4a65d7dd061b0109b3c2645938a Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Wed, 27 Sep 2023 13:39:26 -0700 Subject: [PATCH] EN-62834: Bring back exlusive-lock-on-resync At least for non-feedback secondaries --- .../secondary/PlaybackToSecondary.scala | 17 +++++++++++++++-- .../secondary/SecondaryInfo.scala | 8 +++++++- .../sql/SqlSecondaryStoresConfig.scala | 5 +++-- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala index 1b8325730..ef5aa3cec 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory object PlaybackToSecondary { type SuperUniverse[CT, CV] = Universe[CT, CV] with Commitable with + SecondaryStoresConfigProvider with SecondaryManifestProvider with SecondaryMetricsProvider with DatasetMapReaderProvider with @@ -679,13 +680,25 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV], } } + def dsInfoOf(datasetId: DatasetId) = { + u.secondaryStoresConfig.lookup(secondary.storeId) match { + case Some(config) if config.isFeedback => + logger.info("Non-exclusive resync") + u.datasetMapReader.datasetInfo(datasetId, repeatableRead = true) + case _ => + logger.info("exclusive resync") + u.datasetMapWriter.datasetInfo(datasetId, datasetLockTimeout, semiExclusive = true) + } + } + def resync(): Unit = { val mostRecentlyUpdatedCopyInfo = retrying[Option[metadata.CopyInfo]]({ timingReport("resync", "dataset" -> datasetId) { u.commit() // all updates must be committed before we can change the transaction isolation level val r = u.datasetMapReader - r.datasetInfo(datasetId, repeatableRead = true) match { - // transaction isolation level is now set to REPEATABLE READ + val w = u.datasetMapWriter + dsInfoOf(datasetId) match { + // transaction isolation level is now set to REPEATABLE READ or we have a lock on the dataset case Some(datasetInfo) => val allCopies = r.allCopies(datasetInfo).toSeq.sortBy(_.dataVersion) val mostRecentCopy = diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/SecondaryInfo.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/SecondaryInfo.scala index 790255e30..864f31905 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/SecondaryInfo.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/SecondaryInfo.scala @@ -2,7 +2,13 @@ package com.socrata.datacoordinator.secondary import org.joda.time.DateTime -case class SecondaryConfigInfo(storeId: String, nextRunTime: DateTime, runIntervalSeconds: Int, groupName: String) +case class SecondaryConfigInfo( + storeId: String, + nextRunTime: DateTime, + runIntervalSeconds: Int, + groupName: String, + isFeedback: Boolean +) trait SecondaryStoresConfig { def lookup(storeId: String): Option[SecondaryConfigInfo] diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/sql/SqlSecondaryStoresConfig.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/sql/SqlSecondaryStoresConfig.scala index debda520e..2554631fa 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/sql/SqlSecondaryStoresConfig.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/sql/SqlSecondaryStoresConfig.scala @@ -12,7 +12,7 @@ class SqlSecondaryStoresConfig(conn: Connection, timingReport: TimingReport) ext def lookup(storeId: String): Option[SecondaryConfigInfo] = { val sql = """ - SELECT store_id, next_run_time, interval_in_seconds, group_name + SELECT store_id, next_run_time, interval_in_seconds, group_name, is_feedback_secondary FROM secondary_stores_config WHERE store_id = ?""".stripMargin @@ -26,7 +26,8 @@ class SqlSecondaryStoresConfig(conn: Connection, timingReport: TimingReport) ext rs.getString("store_id"), new DateTime(rs.getTimestamp("next_run_time").getTime), rs.getInt("interval_in_seconds"), - Option(rs.getString("group_name")).getOrElse(""))) + Option(rs.getString("group_name")).getOrElse(""), + rs.getBoolean("is_feedback_secondary"))) } else { None }