diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala index 1b832573..ef5aa3ce 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory object PlaybackToSecondary { type SuperUniverse[CT, CV] = Universe[CT, CV] with Commitable with + SecondaryStoresConfigProvider with SecondaryManifestProvider with SecondaryMetricsProvider with DatasetMapReaderProvider with @@ -679,13 +680,25 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV], } } + def dsInfoOf(datasetId: DatasetId) = { + u.secondaryStoresConfig.lookup(secondary.storeId) match { + case Some(config) if config.isFeedback => + logger.info("Non-exclusive resync") + u.datasetMapReader.datasetInfo(datasetId, repeatableRead = true) + case _ => + logger.info("exclusive resync") + u.datasetMapWriter.datasetInfo(datasetId, datasetLockTimeout, semiExclusive = true) + } + } + def resync(): Unit = { val mostRecentlyUpdatedCopyInfo = retrying[Option[metadata.CopyInfo]]({ timingReport("resync", "dataset" -> datasetId) { u.commit() // all updates must be committed before we can change the transaction isolation level val r = u.datasetMapReader - r.datasetInfo(datasetId, repeatableRead = true) match { - // transaction isolation level is now set to REPEATABLE READ + val w = u.datasetMapWriter + dsInfoOf(datasetId) match { + // transaction isolation level is now set to REPEATABLE READ or we have a lock on the dataset case Some(datasetInfo) => val allCopies = r.allCopies(datasetInfo).toSeq.sortBy(_.dataVersion) val mostRecentCopy = diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/SecondaryInfo.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/SecondaryInfo.scala index 790255e3..864f3190 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/SecondaryInfo.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/SecondaryInfo.scala @@ -2,7 +2,13 @@ package com.socrata.datacoordinator.secondary import org.joda.time.DateTime -case class SecondaryConfigInfo(storeId: String, nextRunTime: DateTime, runIntervalSeconds: Int, groupName: String) +case class SecondaryConfigInfo( + storeId: String, + nextRunTime: DateTime, + runIntervalSeconds: Int, + groupName: String, + isFeedback: Boolean +) trait SecondaryStoresConfig { def lookup(storeId: String): Option[SecondaryConfigInfo] diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/sql/SqlSecondaryStoresConfig.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/sql/SqlSecondaryStoresConfig.scala index debda520..2554631f 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/sql/SqlSecondaryStoresConfig.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/sql/SqlSecondaryStoresConfig.scala @@ -12,7 +12,7 @@ class SqlSecondaryStoresConfig(conn: Connection, timingReport: TimingReport) ext def lookup(storeId: String): Option[SecondaryConfigInfo] = { val sql = """ - SELECT store_id, next_run_time, interval_in_seconds, group_name + SELECT store_id, next_run_time, interval_in_seconds, group_name, is_feedback_secondary FROM secondary_stores_config WHERE store_id = ?""".stripMargin @@ -26,7 +26,8 @@ class SqlSecondaryStoresConfig(conn: Connection, timingReport: TimingReport) ext rs.getString("store_id"), new DateTime(rs.getTimestamp("next_run_time").getTime), rs.getInt("interval_in_seconds"), - Option(rs.getString("group_name")).getOrElse(""))) + Option(rs.getString("group_name")).getOrElse(""), + rs.getBoolean("is_feedback_secondary"))) } else { None }