From 3de15244ca142c15b7430c3258033e6f183eb7a2 Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Mon, 3 Jun 2024 14:49:02 -0700 Subject: [PATCH 1/8] Teach the dataset-mover about collocation-manifest --- .../datacoordinator/common/SoQLCommon.scala | 31 +++++ .../socrata/datacoordinator/mover/Main.scala | 118 +++++++++++++----- 2 files changed, 120 insertions(+), 29 deletions(-) diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala index e50c7f24..3249918c 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala @@ -137,6 +137,37 @@ class SoQLCommon(dataSource: DataSource, } } + // Note this has slightly different semantics from `universe` - in + // particular, normal non-lexical returns that close a resourceScope + // are considered a "normal" return, and hence the transaction will + // be _committed_, whereas the `Managed` block returned from + // `universe` treats them as abnormal and hence roll back. + def scopedUniverse(rs: ResourceScope): PostgresUniverse[CT, CV] with SchemaFinderProvider = { + var success = false + val conn = rs.open(dataSource.getConnection) + var result: PostgresUniverse[CT, CV] with SchemaFinderProvider = null + try { + conn.setAutoCommit(false) + result = new PostgresUniverse(conn, PostgresUniverseCommon) with SchemaFinderProvider { + lazy val cache: Cache = common.cache + lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache) + } + rs.open(result, transitiveClose = List(conn))( + new Resource[PostgresUniverse[CT, CV] with SchemaFinderProvider] { + override def close(u: PostgresUniverse[CT, CV] with SchemaFinderProvider) = + u.commit() + override def closeAbnormally(u: PostgresUniverse[CT, CV] with SchemaFinderProvider, e: Throwable) { + u.rollback() + } + } + ) + success = true + } finally { + if(!success) rs.close(conn) + } + result + } + object PostgresUniverseCommon extends PostgresCommonSupport[SoQLType, SoQLValue] { val typeContext = common.typeContext diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala index ef4bb7b0..b00ae0f8 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala @@ -162,35 +162,29 @@ object Main extends App { val fromDsInfo = truths(fromInternalName.instance) val toDsInfo = truths(toInstance) - val fromCommon = new SoQLCommon( - fromDsInfo.dataSource, - fromDsInfo.copyIn, - executorService, - ServiceMain.tablespaceFinder(serviceConfig.tablespace), - NoopTimingReport, - allowDdlOnPublishedCopies = true, - serviceConfig.writeLockTimeout, - fromInternalName.instance, - tmpDir, - 10000.days, - 10000.days, - NullCache - ) - - val toCommon = new SoQLCommon( - toDsInfo.dataSource, - toDsInfo.copyIn, - executorService, - ServiceMain.tablespaceFinder(serviceConfig.tablespace), - NoopTimingReport, - allowDdlOnPublishedCopies = true, - serviceConfig.writeLockTimeout, - toInstance, - tmpDir, - 10000.days, - 10000.days, - NullCache - ) + val commons = truths.iterator.map { case (instance, dsInfo) => + instance -> new SoQLCommon( + dsInfo.dataSource, + dsInfo.copyIn, + executorService, + ServiceMain.tablespaceFinder(serviceConfig.tablespace), + NoopTimingReport, + allowDdlOnPublishedCopies = true, + serviceConfig.writeLockTimeout, + instance, + tmpDir, + 10000.days, + 10000.days, + NullCache + ) + }.toMap + + val fromCommon = commons(fromInternalName.instance) + val toCommon = commons(toInstance) + + val otherTruthUniverses = (commons -- Seq(fromInternalName.instance, toInstance)).iterator.map { case (instance, common) => + instance -> common.scopedUniverse(rs) + }.toMap for { fromLockUniverse <- fromCommon.universe @@ -410,6 +404,55 @@ object Main extends App { } } + log.info("Copying collocation_manifest records to the new truth...") + // Any secondary_manifest records held in _this_ truth need to + // be put in the other truth and have their reference changed + // from the old internal name to the new internal name. + for { + fromStmt <- managed(fromUniverse.unsafeRawConnection.prepareStatement("select dataset_internal_name_left, dataset_internal_name_right, created_at, updated_at from collocation_manifest where dataset_internal_name_left = ? or dataset_internal_name_right = ?")) + .and { stmt => + stmt.setString(1, fromInternalName.underlying) + stmt.setString(2, fromInternalName.underlying) + } + fromResults <- managed(fromStmt.executeQuery()) + toStmt <- managed(toUniverse.unsafeRawConnection.prepareStatement("insert into collocation_manifest(dataset_internal_name_left, dataset_internal_name_right, created_at, updated_at) values (?, ?, ?, ?)")) + } { + def switchName(s: String) = + if(s == fromInternalName.underlying) toInternalName.underlying else s + + while(fromResults.next()) { + log.info(" - {}/{} => {}/{}", fromResults.getString(1), fromResults.getString(2), switchName(fromResults.getString(1)), switchName(fromResults.getString(2))) + toStmt.setString(1, switchName(fromResults.getString(1))) + toStmt.setString(2, switchName(fromResults.getString(2))) + toStmt.setTimestamp(3, fromResults.getTimestamp(3)) + toStmt.setTimestamp(4, fromResults.getTimestamp(4)) + toStmt.executeUpdate() + } + } + // and for all other truths, we want to simply add a new + // record to this truth's manifest table to say "if you were + // collocated with old-dataset before, now you're also + // collocated with new-dataset" + for((truth, universe) <- otherTruthUniverses ++ Map(toInternalName.instance -> toUniverse)) { + val count1 = + managed(universe.unsafeRawConnection.prepareStatement("insert into collocation_manifest (dataset_internal_name_left, dataset_internal_name_right, created_at, updated_at) select ?, dataset_internal_name_right, created_at, updated_at from collocation_manifest where dataset_internal_name_left = ?")) + .and { stmt => + stmt.setString(1, toInternalName.underlying) + stmt.setString(2, fromInternalName.underlying) + } + .run(_.executeUpdate()) + log.info(" - Created {} records on {} where the old name was left", count1, truth) + + val count2 = + managed(universe.unsafeRawConnection.prepareStatement("insert into collocation_manifest (dataset_internal_name_left, dataset_internal_name_right, created_at, updated_at) select dataset_internal_name_left, ?, created_at, updated_at from collocation_manifest where dataset_internal_name_right = ?")) + .and { stmt => + stmt.setString(1, toInternalName.underlying) + stmt.setString(2, fromInternalName.underlying) + } + .run(_.executeUpdate()) + log.info(" - Created {} records on {} where the old name was right", count2, truth) + } + log.info("Adding the new name to the PG secondary stores...") for(store <- stores if isPgSecondary(store)) { for { @@ -500,17 +543,34 @@ object Main extends App { } // And now we commit the change into the target truth... + log.info("Committing target truth") try { if(dryRun) toUniverse.rollback() else toUniverse.commit() } catch { case e: Exception => + rollbackPGSecondary() rollbackArchivalSecondary() throw e } fromUniverse.rollback() + log.info("Committing collocation_manifest changes to other truths") + // Same comment as a above re: what happens if this fails + try { + for((instance, universe) <- otherTruthUniverses) { + log.info("..{}", instance) + if(dryRun) universe.commit() + else universe.rollback() + } + } catch { + case e: Exception => + rollbackPGSecondary() + rollbackArchivalSecondary() + throw e + } + log.info("Informing soda-fountain of the internal name change...") // Same comment as a above re: what happens if this fails try { From 04f008b15f93e1b5d5d4178497eda098b7fafc36 Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Tue, 4 Jun 2024 09:34:01 -0700 Subject: [PATCH 2/8] Refactor universe & scopedUniverse --- .../datacoordinator/common/SoQLCommon.scala | 46 ++++++++----------- .../socrata/datacoordinator/mover/Main.scala | 8 ++-- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala index 3249918c..68138f39 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala @@ -115,15 +115,17 @@ class SoQLCommon(dataSource: DataSource, def isSystemColumnId(name: UserColumnId): Boolean = SoQLSystemColumns.isSystemColumnId(name) - val universe: Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] = new Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] { - def run[B](f: PostgresUniverse[CT, CV] with SchemaFinderProvider => B): B = { + private class Universe(conn: Connection) extends PostgresUniverse[CT, CV](conn, PostgresUniverseCommon) with SchemaFinderProvider { + lazy val cache: Cache = common.cache + lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache) + } + + val universe: Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] = new Managed[Universe] { + def run[B](f: Universe => B): B = { val conn = dataSource.getConnection() try { conn.setAutoCommit(false) - val u = new PostgresUniverse(conn, PostgresUniverseCommon) with SchemaFinderProvider { - lazy val cache: Cache = common.cache - lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache) - } + val u = new Universe(conn) try { val result = f(u) u.commit() @@ -143,29 +145,17 @@ class SoQLCommon(dataSource: DataSource, // be _committed_, whereas the `Managed` block returned from // `universe` treats them as abnormal and hence roll back. def scopedUniverse(rs: ResourceScope): PostgresUniverse[CT, CV] with SchemaFinderProvider = { - var success = false - val conn = rs.open(dataSource.getConnection) - var result: PostgresUniverse[CT, CV] with SchemaFinderProvider = null - try { - conn.setAutoCommit(false) - result = new PostgresUniverse(conn, PostgresUniverseCommon) with SchemaFinderProvider { - lazy val cache: Cache = common.cache - lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache) - } - rs.open(result, transitiveClose = List(conn))( - new Resource[PostgresUniverse[CT, CV] with SchemaFinderProvider] { - override def close(u: PostgresUniverse[CT, CV] with SchemaFinderProvider) = - u.commit() - override def closeAbnormally(u: PostgresUniverse[CT, CV] with SchemaFinderProvider, e: Throwable) { - u.rollback() - } + val conn = rs.open(dataSource.getConnection()) + conn.setAutoCommit(false) + rs.open(new Universe(conn), transitiveClose = List(conn))( + new Resource[Universe] { + override def close(u: Universe) = + u.commit() + override def closeAbnormally(u: Universe, e: Throwable) { + u.rollback() } - ) - success = true - } finally { - if(!success) rs.close(conn) - } - result + } + ) } object PostgresUniverseCommon extends PostgresCommonSupport[SoQLType, SoQLValue] { diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala index b00ae0f8..44dca9c7 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala @@ -65,7 +65,7 @@ object Main extends App { implicit val executorShutdown = Resource.executorShutdownNoTimeout def fullyReplicated(datasetId: DatasetId, manifest: SecondaryManifest, targetVersion: Long): Boolean = { - manifest.stores(datasetId).values.forall{ case (version: Long, _) => version == targetVersion } + manifest.stores(datasetId).values.forall{ case (version: Long, _) => version >= targetVersion } } def isPgSecondary(store: String): Boolean = @@ -387,7 +387,7 @@ object Main extends App { bail("Dataset not actually up to date?") } - if(latestSecondaryDataVersion != fromDsInfo.latestDataVersion) { + if(latestSecondaryDataVersion < fromDsInfo.latestDataVersion) { bail("Dataset not fully replicated after we checked that it was?") } @@ -561,8 +561,8 @@ object Main extends App { try { for((instance, universe) <- otherTruthUniverses) { log.info("..{}", instance) - if(dryRun) universe.commit() - else universe.rollback() + if(dryRun) universe.rollback() + else universe.commit() } } catch { case e: Exception => From ab676b9879df25ec648070688d80575721f9b6d2 Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Wed, 5 Jun 2024 12:04:26 -0700 Subject: [PATCH 3/8] Don't refuse to move a secondary-less dataset --- .../main/scala/com/socrata/datacoordinator/mover/Main.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala index 44dca9c7..018711c7 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala @@ -212,9 +212,6 @@ object Main extends App { } val stores = fromUniverse.secondaryManifest.stores(fromDsInfo.systemId).keySet - if(stores.isEmpty) { - bail("Refusing to move dataset that lives in no stores") - } val invalidSecondaries = stores -- acceptableSecondaries if(invalidSecondaries.nonEmpty) { bail("Refusing to move dataset that lives in " + invalidSecondaries) From 1a6072b6517c2b22e101a85dc69e691047de2802 Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Thu, 6 Jun 2024 11:15:09 -0700 Subject: [PATCH 4/8] Add a simple parallel mover manager --- dataset-mover/build.sbt | 1 + .../mover/DoManagedMoves.scala | 110 ++++++++++++++++++ .../datacoordinator/mover/LinesIterator.scala | 70 +++++++++++ .../socrata/datacoordinator/mover/Main.scala | 45 ++++--- .../datacoordinator/mover/Workers.scala | 97 +++++++++++++++ 5 files changed, 305 insertions(+), 18 deletions(-) create mode 100644 dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala create mode 100644 dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/LinesIterator.scala create mode 100644 dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala diff --git a/dataset-mover/build.sbt b/dataset-mover/build.sbt index e695c2dc..e90b4f68 100644 --- a/dataset-mover/build.sbt +++ b/dataset-mover/build.sbt @@ -3,6 +3,7 @@ import Dependencies._ name := "dataset-mover" libraryDependencies ++= Seq( + "org.xerial" % "sqlite-jdbc" % "3.46.0.0" ) assembly/test := {} diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala new file mode 100644 index 00000000..58baeeea --- /dev/null +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala @@ -0,0 +1,110 @@ +package com.socrata.datacoordinator.mover + +import scala.util.control.Breaks + +import java.io.{File, FileInputStream, InputStreamReader, BufferedReader} +import java.nio.charset.StandardCharsets +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean +import java.sql.DriverManager +import sun.misc.{Signal, SignalHandler} + +import com.rojoma.simplearm.v2._ +import com.typesafe.config.ConfigFactory +import org.apache.log4j.PropertyConfigurator +import org.slf4j.LoggerFactory + +import com.socrata.thirdparty.typesafeconfig.Propertizer + +case class DoManagedMoves(dryRun: Boolean, fromInstance: String, toInstance: String, trackerFile: String, systemIdListFile: String, parallelismRaw: String) { + val parallelism = parallelismRaw.toInt + + val serviceConfig = try { + new MoverConfig(ConfigFactory.load(), "com.socrata.coordinator.datasetmover") + } catch { + case e: Exception => + Console.err.println(e) + sys.exit(1) + } + + PropertyConfigurator.configure(Propertizer("log4j", serviceConfig.logProperties)) + + val log = LoggerFactory.getLogger(classOf[DoManagedMoves]) + + val SIGTERM = new Signal("TERM") + val SIGINT = new Signal("INT") + val shutdownSignalled = new AtomicBoolean(false) + val shutdownSignalHandler = new SignalHandler { + private val firstSignal = new AtomicBoolean(false) + def handle(signal: Signal) { + if (firstSignal.getAndSet(false)) { + log.info("Signalling main thread to stop adding jobs") + shutdownSignalled.set(true) + } else { + log.info("Shutdown already in progress") + } + } + } + + var oldSIGTERM: SignalHandler = null + var oldSIGINT: SignalHandler = null + + try { + oldSIGTERM = Signal.handle(SIGTERM, shutdownSignalHandler) + oldSIGINT = Signal.handle(SIGINT, shutdownSignalHandler) + + using(new ResourceScope) { rs => + val conn = DriverManager.getConnection("jdbc:sqlite:" + (if(dryRun) ":memory:" else trackerFile)) + conn.setAutoCommit(true) + + using(conn.createStatement()) { stmt => + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS progress (system_id integer not null primary key, finished_at text null, finished_successfully boolean null)") + } + + val file = locally { + val fis = rs.open(new FileInputStream(systemIdListFile)) + val isr = rs.open(new InputStreamReader(fis, StandardCharsets.UTF_8), transitiveClose = List(fis)) + val br = rs.open(new BufferedReader(isr), transitiveClose = List(isr)) + rs.openUnmanaged(new LinesIterator(br), transitiveClose = List(br)) + } + + def finish(job: Workers.CompletedJob): Unit = { + using(conn.prepareStatement("UPDATE progress SET finished_at = ?, finished_successfully = ? WHERE system_id = ?")) { stmt => + stmt.setString(1, job.finishedAt.toString) + stmt.setBoolean(2, job.finishedSuccessFully) + stmt.setLong(3, job.systemId) + stmt.executeUpdate() + } + } + + val workers = new Workers(parallelism) + + val break = new Breaks + break.breakable { + for(line <- file) { + if(shutdownSignalled.get) break.break() + + val id = line.toLong + + val inserted = using(conn.prepareStatement("INSERT INTO progress (system_id) VALUES (?) ON CONFLICT(system_id) DO NOTHING")) { stmt => + stmt.setLong(1, id) + stmt.executeUpdate() != 0 + } + + if(inserted) { + for(previousJob <- workers.submit(fromInstance, toInstance, id)) { + finish(previousJob) + } + } + } + } + + for(job <- workers.shutdown()) { + finish(job) + } + } + } finally { + if(oldSIGINT != null) Signal.handle(SIGINT, oldSIGINT) + if(oldSIGTERM != null) Signal.handle(SIGTERM, oldSIGTERM) + } +} diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/LinesIterator.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/LinesIterator.scala new file mode 100644 index 00000000..c5ef2d38 --- /dev/null +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/LinesIterator.scala @@ -0,0 +1,70 @@ +package com.socrata.datacoordinator.mover + +import scala.annotation.tailrec + +import java.io.BufferedReader + +class LinesIterator(reader: BufferedReader, keepEndOfLine: Boolean = false) extends Iterator[String] with BufferedIterator[String] { + private var pending: String = null + private var done = false + + def hasNext = pending != null || advance() + + def head = + if(hasNext) pending + else Iterator.empty.next() + + def next(): String = + if(hasNext) { + val result = pending + pending = null + result + } else { + Iterator.empty.next() + } + + private def advance(): Boolean = + if(done) { + false + } else { + pending = + if(keepEndOfLine) readIncludingEOL() + else reader.readLine() + done = pending == null + !done + } + + private def readIncludingEOL(): String = { + val sb = new StringBuilder + + @tailrec + def loop(): Unit = { + reader.read() match { + case -1 => + // done + case 10 => // \n + sb.append('\n') + case 13 => // \r + reader.mark(1) + reader.read() match { + case 10 => + sb.append("\r\n") + case _ => + sb.append('\r') + reader.reset() + } + case other => + sb.append(other.toChar) + loop() + } + } + + loop() + + if(sb.nonEmpty) { + sb.toString() + } else { + null + } + } +} diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala index 018711c7..780717c6 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala @@ -29,27 +29,28 @@ import com.socrata.datacoordinator.truth.metadata.LifecycleStage sealed abstract class Main object Main extends App { - if(args.length != 2) { - System.err.println("Usage: dataset-mover.jar INTERNAL_NAME TARGET_TRUTH") - System.err.println() - System.err.println(" INTERNAL_NAME internal name (e.g., alpha.1234) of the dataset to move") - System.err.println(" TARGET_TRUTH truthstore in which to move it (e.g., bravo)") - System.err.println() - System.err.println("Unless the SOCRATA_COMMIT_MOVE environment variable is set, all changes") - System.err.println("will be rolled back rather than committed.") - sys.exit(1) - } + val dryRun = sys.env.get("SOCRATA_COMMIT_MOVE").isEmpty - val fromInternalName = DatasetInternalName(args(0)).getOrElse { - System.err.println("Illegal dataset internal name") - sys.exit(1) + args match { + case Array(internalNameRaw, targetTruthRaw) => + DoSingleMove(dryRun, internalNameRaw, targetTruthRaw) + case Array(sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) => + DoManagedMoves(dryRun, sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) + case _ => + System.err.println("Usage:") + System.err.println(" dataset-mover.jar INTERNAL_NAME TARGET_TRUTH") + System.err.println(" dataset-mover.jar SOURCE_TRUTH TARGET_TRUTH TRACKER_FILE SYSTEM_ID_LIST_FILE PARALLELISM") + System.err.println() + System.err.println(" INTERNAL_NAME internal name (e.g., alpha.1234) of the dataset to move") + System.err.println(" TARGET_TRUTH truthstore in which to move it (e.g., bravo)") + System.err.println() + System.err.println("Unless the SOCRATA_COMMIT_MOVE environment variable is set, all changes") + System.err.println("will be rolled back rather than committed.") + sys.exit(1) } - val fromDatasetId = fromInternalName.datasetId - - val toInstance = args(1) - - val dryRun = sys.env.get("SOCRATA_COMMIT_MOVE").isEmpty +} +case class DoSingleMove(dryRun: Boolean, fromInternalNameRaw: String, toInstance: String) { val serviceConfig = try { new MoverConfig(ConfigFactory.load(), "com.socrata.coordinator.datasetmover") } catch { @@ -62,6 +63,14 @@ object Main extends App { val log = org.slf4j.LoggerFactory.getLogger(classOf[Main]) + log.info("Moving {} to {}", fromInternalNameRaw:Any, toInstance) + + val fromInternalName = DatasetInternalName(fromInternalNameRaw).getOrElse { + System.err.println("Illegal dataset internal name") + sys.exit(1) + } + val fromDatasetId = fromInternalName.datasetId + implicit val executorShutdown = Resource.executorShutdownNoTimeout def fullyReplicated(datasetId: DatasetId, manifest: SecondaryManifest, targetVersion: Long): Boolean = { diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala new file mode 100644 index 00000000..b50bc265 --- /dev/null +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala @@ -0,0 +1,97 @@ +package com.socrata.datacoordinator.mover + +import scala.collection.JavaConverters._ +import scala.util.control.Breaks + +import java.io.{File, InputStreamReader, BufferedReader} +import java.nio.charset.StandardCharsets +import java.time.Instant +import java.util.concurrent.ArrayBlockingQueue + +import com.rojoma.simplearm.v2._ +import org.slf4j.LoggerFactory + +class Workers(parallelism: Int) { + import Workers._ + + private val stdoutMutex = new Object + private var running = 0 + private val queue = new ArrayBlockingQueue[CompletedJob](parallelism) + + def submit(fromInstance: String, toInstance: String, systemId: Long): Option[CompletedJob] = { + val result = + if(running == parallelism) { + val completed = queue.take() + running -= 1 + Some(completed) + } else { + None + } + + val thread = new Thread() { + override def run() { + try { + runJob(fromInstance, toInstance, systemId) + } catch { + case e: Exception => + log.error("Unexpected exception running job for {}.{}", fromInstance, systemId.asInstanceOf[AnyRef], e) + queue.add(CompletedJob(systemId, Instant.now(), false)) + } + } + } + thread.start() + running += 1 + + result + } + + def shutdown(): Iterator[Workers.CompletedJob] = + (0 until running).iterator.map { _ => queue.take() } + + private def runJob(fromInstance: String, toInstance: String, systemId: Long): Unit = { + val args = Seq.newBuilder[String] + args += "java" + for(confFile <- Option(System.getProperty("config.file"))) { + args += "-Dconfig.file=" + confFile + } + args += "-jar" + args += new File(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI).getPath + args += s"$fromInstance.$systemId" + args += toInstance + + val proc = new ProcessBuilder(args.result().asJava) + .inheritIO() + .redirectOutput(ProcessBuilder.Redirect.PIPE) + .redirectErrorStream(true) + .start() + + val successful = try { + for { + stdout <- managed(proc.getInputStream) + isr <- managed(new InputStreamReader(stdout, StandardCharsets.UTF_8)) + br <- managed(new BufferedReader(isr)) + } { + for(line <- new LinesIterator(br, keepEndOfLine = true)) { + stdoutMutex.synchronized { + Console.out.print(s"${fromInstance}.${systemId}: ${line}") + Console.out.flush() + } + } + log.info("Subprocess stdout closed: {}.{}", fromInstance, systemId) + proc.waitFor() == 0 + } + } catch { + case e: Exception => + log.error("Exception while running a job for {}.{}", fromInstance, systemId.asInstanceOf[AnyRef], e) + queue.add(CompletedJob(systemId, Instant.now(), false)) + proc.destroyForcibly() + return + } + queue.add(CompletedJob(systemId, Instant.now(), successful)) + } +} + +object Workers { + val log = LoggerFactory.getLogger(classOf[Workers]) + case class CompletedJob(systemId: Long, finishedAt: Instant, finishedSuccessFully: Boolean) +} From 8bdc592ba3b713fa38628fc6b4058fcdc448bdcd Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Mon, 10 Jun 2024 12:13:46 -0700 Subject: [PATCH 5/8] More managed-mover improvements --- .../mover/DoManagedMoves.scala | 51 ++++++++---- .../socrata/datacoordinator/mover/Main.scala | 81 +++++++++++-------- .../datacoordinator/mover/Workers.scala | 60 ++++---------- 3 files changed, 98 insertions(+), 94 deletions(-) diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala index 58baeeea..2b6ef766 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala @@ -6,6 +6,7 @@ import java.io.{File, FileInputStream, InputStreamReader, BufferedReader} import java.nio.charset.StandardCharsets import java.time.Instant import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.Executors import java.sql.DriverManager import sun.misc.{Signal, SignalHandler} @@ -14,21 +15,12 @@ import com.typesafe.config.ConfigFactory import org.apache.log4j.PropertyConfigurator import org.slf4j.LoggerFactory +import com.socrata.http.client.HttpClientHttpClient import com.socrata.thirdparty.typesafeconfig.Propertizer -case class DoManagedMoves(dryRun: Boolean, fromInstance: String, toInstance: String, trackerFile: String, systemIdListFile: String, parallelismRaw: String) { +case class DoManagedMoves(serviceConfig: MoverConfig, dryRun: Boolean, fromInstance: String, toInstance: String, trackerFile: String, systemIdListFile: String, parallelismRaw: String) { val parallelism = parallelismRaw.toInt - val serviceConfig = try { - new MoverConfig(ConfigFactory.load(), "com.socrata.coordinator.datasetmover") - } catch { - case e: Exception => - Console.err.println(e) - sys.exit(1) - } - - PropertyConfigurator.configure(Propertizer("log4j", serviceConfig.logProperties)) - val log = LoggerFactory.getLogger(classOf[DoManagedMoves]) val SIGTERM = new Signal("TERM") @@ -54,11 +46,25 @@ case class DoManagedMoves(dryRun: Boolean, fromInstance: String, toInstance: Str oldSIGINT = Signal.handle(SIGINT, shutdownSignalHandler) using(new ResourceScope) { rs => + implicit val executorShutdown = Resource.executorShutdownNoTimeout + val executorService = rs.open(Executors.newCachedThreadPool) + val httpClient = rs.open(new HttpClientHttpClient(executorService)) + + val singleMover = new SingleMover(serviceConfig, dryRun, executorService, httpClient) + val conn = DriverManager.getConnection("jdbc:sqlite:" + (if(dryRun) ":memory:" else trackerFile)) conn.setAutoCommit(true) using(conn.createStatement()) { stmt => stmt.executeUpdate("CREATE TABLE IF NOT EXISTS progress (system_id integer not null primary key, finished_at text null, finished_successfully boolean null)") + + val failure_reason_exists = + using(stmt.executeQuery("select * from pragma_table_info('progress') where name = 'failure_reason'")) { rs => + rs.next() + } + if(!failure_reason_exists) { + stmt.executeUpdate("ALTER TABLE progress ADD COLUMN failure_reason TEXT NULL"); + } } val file = locally { @@ -69,15 +75,26 @@ case class DoManagedMoves(dryRun: Boolean, fromInstance: String, toInstance: Str } def finish(job: Workers.CompletedJob): Unit = { - using(conn.prepareStatement("UPDATE progress SET finished_at = ?, finished_successfully = ? WHERE system_id = ?")) { stmt => - stmt.setString(1, job.finishedAt.toString) - stmt.setBoolean(2, job.finishedSuccessFully) - stmt.setLong(3, job.systemId) - stmt.executeUpdate() + job match { + case Workers.SuccessfulJob(systemId, finishedAt) => + using(conn.prepareStatement("UPDATE progress SET finished_at = ?, finished_successfully = ? WHERE system_id = ?")) { stmt => + stmt.setString(1, job.finishedAt.toString) + stmt.setBoolean(2, true) + stmt.setLong(3, job.systemId) + stmt.executeUpdate() + } + case Workers.FailedJob(systemId, finishedAt, failureReason) => + using(conn.prepareStatement("UPDATE progress SET finished_at = ?, finished_successfully = ?, failure_reason = ? WHERE system_id = ?")) { stmt => + stmt.setString(1, job.finishedAt.toString) + stmt.setBoolean(2, false) + stmt.setString(3, failureReason) + stmt.setLong(4, job.systemId) + stmt.executeUpdate() + } } } - val workers = new Workers(parallelism) + val workers = new Workers(parallelism, singleMover) val break = new Breaks break.breakable { diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala index 780717c6..e59605f1 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala @@ -6,7 +6,8 @@ import scala.concurrent.duration._ import java.io.File import java.net.URL import java.sql.Connection -import java.util.concurrent.Executors +import java.time.Instant +import java.util.concurrent.{Executors, ExecutorService} import com.rojoma.simplearm.v2._ import com.typesafe.config.ConfigFactory @@ -14,7 +15,7 @@ import org.apache.log4j.PropertyConfigurator import org.postgresql.PGConnection import org.postgresql.copy.{CopyIn,CopyOut} -import com.socrata.http.client.HttpClientHttpClient +import com.socrata.http.client.{HttpClient, HttpClientHttpClient} import com.socrata.soql.types.SoQLType import com.socrata.thirdparty.typesafeconfig.Propertizer @@ -31,11 +32,25 @@ sealed abstract class Main object Main extends App { val dryRun = sys.env.get("SOCRATA_COMMIT_MOVE").isEmpty + def setup(): MoverConfig = { + val serviceConfig = try { + new MoverConfig(ConfigFactory.load(), "com.socrata.coordinator.datasetmover") + } catch { + case e: Exception => + Console.err.println(e) + sys.exit(1) + } + + PropertyConfigurator.configure(Propertizer("log4j", serviceConfig.logProperties)) + + serviceConfig + } + args match { case Array(internalNameRaw, targetTruthRaw) => - DoSingleMove(dryRun, internalNameRaw, targetTruthRaw) + DoSingleMove(setup(), dryRun, internalNameRaw, targetTruthRaw) case Array(sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) => - DoManagedMoves(dryRun, sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) + DoManagedMoves(setup(), dryRun, sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) case _ => System.err.println("Usage:") System.err.println(" dataset-mover.jar INTERNAL_NAME TARGET_TRUTH") @@ -50,28 +65,38 @@ object Main extends App { } } -case class DoSingleMove(dryRun: Boolean, fromInternalNameRaw: String, toInstance: String) { - val serviceConfig = try { - new MoverConfig(ConfigFactory.load(), "com.socrata.coordinator.datasetmover") +case class DoSingleMove(serviceConfig: MoverConfig, dryRun: Boolean, fromInternalNameRaw: String, toInstance: String) { + try { + val fromInternalName = DatasetInternalName(fromInternalNameRaw).getOrElse { + System.err.println("Illegal dataset internal name") + sys.exit(1) + } + + using(new ResourceScope) { rs => + implicit val executorShutdown = Resource.executorShutdownNoTimeout + val executorService = rs.open(Executors.newCachedThreadPool) + val httpClient = rs.open(new HttpClientHttpClient(executorService)) + new SingleMover(serviceConfig, dryRun, executorService, httpClient).apply(fromInternalName, toInstance) + } } catch { - case e: Exception => - Console.err.println(e) + case e: SingleMover.Bail => + System.err.println(e.getMessage) sys.exit(1) } +} - PropertyConfigurator.configure(Propertizer("log4j", serviceConfig.logProperties)) - - val log = org.slf4j.LoggerFactory.getLogger(classOf[Main]) +object SingleMover { + val log = org.slf4j.LoggerFactory.getLogger(classOf[SingleMover]) + case class Bail(msg: String) extends Exception(msg) + def bail(msg: String): Nothing = throw new Bail(msg) +} - log.info("Moving {} to {}", fromInternalNameRaw:Any, toInstance) +class SingleMover(serviceConfig: MoverConfig, dryRun: Boolean, executorService: ExecutorService, httpClient: HttpClient) extends ((DatasetInternalName, String) => Unit) { + import SingleMover._ - val fromInternalName = DatasetInternalName(fromInternalNameRaw).getOrElse { - System.err.println("Illegal dataset internal name") - sys.exit(1) + if(serviceConfig.truths.values.exists(_.poolOptions.isDefined)) { + bail("truths must not be c3p0 data sources") } - val fromDatasetId = fromInternalName.datasetId - - implicit val executorShutdown = Resource.executorShutdownNoTimeout def fullyReplicated(datasetId: DatasetId, manifest: SecondaryManifest, targetVersion: Long): Boolean = { manifest.stores(datasetId).values.forall{ case (version: Long, _) => version >= targetVersion } @@ -80,11 +105,6 @@ case class DoSingleMove(dryRun: Boolean, fromInternalNameRaw: String, toInstance def isPgSecondary(store: String): Boolean = serviceConfig.pgSecondaries.contains(store) - if(serviceConfig.truths.values.exists(_.poolOptions.isDefined)) { - System.err.println("truths must not be c3p0 data sources") - sys.exit(1) - } - def doCopy(fromConn: Connection, copyOutSql: String, toConn: Connection, copyInSql: String) { log.info("Copy from: {}", copyOutSql) log.info("Copy to: {}", copyInSql) @@ -146,13 +166,12 @@ case class DoSingleMove(dryRun: Boolean, fromInternalNameRaw: String, toInstance serviceConfig.archivalUrl.map(_ => "archival") ++ serviceConfig.additionalAcceptableSecondaries - class Bail(msg: String) extends Exception(msg) - def bail(msg: String): Nothing = throw new Bail(msg) + def apply(fromInternalName: DatasetInternalName, toInstance: String): Unit = { + log.info("Moving {} to {}", fromInternalName: Any, toInstance) + + val fromDatasetId = fromInternalName.datasetId - try { using(new ResourceScope) { rs => - val executorService = rs.open(Executors.newCachedThreadPool) - val httpClient = rs.open(new HttpClientHttpClient(executorService)) val tmpDir = ResourceUtil.Temporary.Directory.scoped[File](rs) val truths = serviceConfig.truths.iterator.map { case (k, v) => @@ -671,9 +690,5 @@ case class DoSingleMove(dryRun: Boolean, fromInternalNameRaw: String, toInstance println("Moved " + fromInternalName + " to " + toInternalName) } } - } catch { - case e: Bail => - System.err.println(e.getMessage) - sys.exit(1) } } diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala index b50bc265..bd15c548 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala @@ -1,17 +1,14 @@ package com.socrata.datacoordinator.mover -import scala.collection.JavaConverters._ -import scala.util.control.Breaks - -import java.io.{File, InputStreamReader, BufferedReader} -import java.nio.charset.StandardCharsets import java.time.Instant import java.util.concurrent.ArrayBlockingQueue import com.rojoma.simplearm.v2._ import org.slf4j.LoggerFactory -class Workers(parallelism: Int) { +import com.socrata.datacoordinator.id.DatasetInternalName + +class Workers(parallelism: Int, singleMover: (DatasetInternalName, String) => Unit) { import Workers._ private val stdoutMutex = new Object @@ -29,13 +26,14 @@ class Workers(parallelism: Int) { } val thread = new Thread() { + setName(s"${fromInstance}.${systemId} -> ${toInstance}") override def run() { try { runJob(fromInstance, toInstance, systemId) } catch { case e: Exception => log.error("Unexpected exception running job for {}.{}", fromInstance, systemId.asInstanceOf[AnyRef], e) - queue.add(CompletedJob(systemId, Instant.now(), false)) + queue.add(FailedJob(systemId, Instant.now(), s"Unexpected exception: $e")) } } } @@ -49,49 +47,23 @@ class Workers(parallelism: Int) { (0 until running).iterator.map { _ => queue.take() } private def runJob(fromInstance: String, toInstance: String, systemId: Long): Unit = { - val args = Seq.newBuilder[String] - args += "java" - for(confFile <- Option(System.getProperty("config.file"))) { - args += "-Dconfig.file=" + confFile - } - args += "-jar" - args += new File(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI).getPath - args += s"$fromInstance.$systemId" - args += toInstance - - val proc = new ProcessBuilder(args.result().asJava) - .inheritIO() - .redirectOutput(ProcessBuilder.Redirect.PIPE) - .redirectErrorStream(true) - .start() - - val successful = try { - for { - stdout <- managed(proc.getInputStream) - isr <- managed(new InputStreamReader(stdout, StandardCharsets.UTF_8)) - br <- managed(new BufferedReader(isr)) - } { - for(line <- new LinesIterator(br, keepEndOfLine = true)) { - stdoutMutex.synchronized { - Console.out.print(s"${fromInstance}.${systemId}: ${line}") - Console.out.flush() - } - } - log.info("Subprocess stdout closed: {}.{}", fromInstance, systemId) - proc.waitFor() == 0 - } + try { + singleMover(DatasetInternalName(s"${fromInstance}.${systemId}").getOrElse(SingleMover.bail("Invalid dataset internal name")), toInstance) } catch { - case e: Exception => - log.error("Exception while running a job for {}.{}", fromInstance, systemId.asInstanceOf[AnyRef], e) - queue.add(CompletedJob(systemId, Instant.now(), false)) - proc.destroyForcibly() + case SingleMover.Bail(msg) => + queue.add(FailedJob(systemId, Instant.now(), msg)) return } - queue.add(CompletedJob(systemId, Instant.now(), successful)) + queue.add(SuccessfulJob(systemId, Instant.now())) } } object Workers { val log = LoggerFactory.getLogger(classOf[Workers]) - case class CompletedJob(systemId: Long, finishedAt: Instant, finishedSuccessFully: Boolean) + sealed abstract class CompletedJob { + val systemId: Long + val finishedAt: Instant + } + case class SuccessfulJob(systemId: Long, finishedAt: Instant) extends CompletedJob + case class FailedJob(systemId: Long, finishedAt: Instant, failureReason: String) extends CompletedJob } From 4b19a063fdb8ddfd41752fc32a6ad872680458a0 Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Mon, 5 Aug 2024 16:44:48 -0700 Subject: [PATCH 6/8] Fix shutdown handling --- .../datacoordinator/mover/DoManagedMoves.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala index 2b6ef766..793111ab 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala @@ -27,14 +27,8 @@ case class DoManagedMoves(serviceConfig: MoverConfig, dryRun: Boolean, fromInsta val SIGINT = new Signal("INT") val shutdownSignalled = new AtomicBoolean(false) val shutdownSignalHandler = new SignalHandler { - private val firstSignal = new AtomicBoolean(false) def handle(signal: Signal) { - if (firstSignal.getAndSet(false)) { - log.info("Signalling main thread to stop adding jobs") - shutdownSignalled.set(true) - } else { - log.info("Shutdown already in progress") - } + shutdownSignalled.set(true) } } @@ -99,7 +93,10 @@ case class DoManagedMoves(serviceConfig: MoverConfig, dryRun: Boolean, fromInsta val break = new Breaks break.breakable { for(line <- file) { - if(shutdownSignalled.get) break.break() + if(shutdownSignalled.get) { + log.info("Stopping because of signal") + break.break() + } val id = line.toLong From ade299bb28bb6f425536fa2e2789c9eb22939b37 Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Tue, 6 Aug 2024 12:02:06 -0700 Subject: [PATCH 7/8] Add "ignore replication" flag --- .../mover/DoManagedMoves.scala | 4 +-- .../socrata/datacoordinator/mover/Main.scala | 34 ++++++++++++------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala index 793111ab..20f387df 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala @@ -18,7 +18,7 @@ import org.slf4j.LoggerFactory import com.socrata.http.client.HttpClientHttpClient import com.socrata.thirdparty.typesafeconfig.Propertizer -case class DoManagedMoves(serviceConfig: MoverConfig, dryRun: Boolean, fromInstance: String, toInstance: String, trackerFile: String, systemIdListFile: String, parallelismRaw: String) { +case class DoManagedMoves(serviceConfig: MoverConfig, dryRun: Boolean, ignoreReplication: Boolean, fromInstance: String, toInstance: String, trackerFile: String, systemIdListFile: String, parallelismRaw: String) { val parallelism = parallelismRaw.toInt val log = LoggerFactory.getLogger(classOf[DoManagedMoves]) @@ -44,7 +44,7 @@ case class DoManagedMoves(serviceConfig: MoverConfig, dryRun: Boolean, fromInsta val executorService = rs.open(Executors.newCachedThreadPool) val httpClient = rs.open(new HttpClientHttpClient(executorService)) - val singleMover = new SingleMover(serviceConfig, dryRun, executorService, httpClient) + val singleMover = new SingleMover(serviceConfig, dryRun, ignoreReplication, executorService, httpClient) val conn = DriverManager.getConnection("jdbc:sqlite:" + (if(dryRun) ":memory:" else trackerFile)) conn.setAutoCommit(true) diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala index e59605f1..5cdf9c16 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala @@ -32,6 +32,8 @@ sealed abstract class Main object Main extends App { val dryRun = sys.env.get("SOCRATA_COMMIT_MOVE").isEmpty + val ignoreReplication = sys.env.get("SOCRATA_IGNORE_REPLICATION").isDefined + def setup(): MoverConfig = { val serviceConfig = try { new MoverConfig(ConfigFactory.load(), "com.socrata.coordinator.datasetmover") @@ -48,9 +50,9 @@ object Main extends App { args match { case Array(internalNameRaw, targetTruthRaw) => - DoSingleMove(setup(), dryRun, internalNameRaw, targetTruthRaw) + DoSingleMove(setup(), dryRun, ignoreReplication, internalNameRaw, targetTruthRaw) case Array(sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) => - DoManagedMoves(setup(), dryRun, sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) + DoManagedMoves(setup(), dryRun, ignoreReplication, sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) case _ => System.err.println("Usage:") System.err.println(" dataset-mover.jar INTERNAL_NAME TARGET_TRUTH") @@ -61,11 +63,15 @@ object Main extends App { System.err.println() System.err.println("Unless the SOCRATA_COMMIT_MOVE environment variable is set, all changes") System.err.println("will be rolled back rather than committed.") + System.err.println() + System.err.println("If the SOCRATA_IGNORE_REPLICATION environment variable is set, the mover") + System.err.println("will NOT wait for replication to complete before moving the dataset. This") + System.err.println("option is dangerous; it may require a resync after the move completes.") sys.exit(1) } } -case class DoSingleMove(serviceConfig: MoverConfig, dryRun: Boolean, fromInternalNameRaw: String, toInstance: String) { +case class DoSingleMove(serviceConfig: MoverConfig, dryRun: Boolean, ignoreReplication: Boolean, fromInternalNameRaw: String, toInstance: String) { try { val fromInternalName = DatasetInternalName(fromInternalNameRaw).getOrElse { System.err.println("Illegal dataset internal name") @@ -76,7 +82,7 @@ case class DoSingleMove(serviceConfig: MoverConfig, dryRun: Boolean, fromInterna implicit val executorShutdown = Resource.executorShutdownNoTimeout val executorService = rs.open(Executors.newCachedThreadPool) val httpClient = rs.open(new HttpClientHttpClient(executorService)) - new SingleMover(serviceConfig, dryRun, executorService, httpClient).apply(fromInternalName, toInstance) + new SingleMover(serviceConfig, dryRun, ignoreReplication, executorService, httpClient).apply(fromInternalName, toInstance) } } catch { case e: SingleMover.Bail => @@ -91,7 +97,7 @@ object SingleMover { def bail(msg: String): Nothing = throw new Bail(msg) } -class SingleMover(serviceConfig: MoverConfig, dryRun: Boolean, executorService: ExecutorService, httpClient: HttpClient) extends ((DatasetInternalName, String) => Unit) { +class SingleMover(serviceConfig: MoverConfig, dryRun: Boolean, ignoreReplication: Boolean, executorService: ExecutorService, httpClient: HttpClient) extends ((DatasetInternalName, String) => Unit) { import SingleMover._ if(serviceConfig.truths.values.exists(_.poolOptions.isDefined)) { @@ -228,12 +234,14 @@ class SingleMover(serviceConfig: MoverConfig, dryRun: Boolean, executorService: var dsInfo = fromMapReader.datasetInfo(fromDatasetId).getOrElse { bail("Can't find dataset") } - while(!fullyReplicated(dsInfo.systemId, fromUniverse.secondaryManifest, dsInfo.latestDataVersion)) { - fromUniverse.rollback() - log.info("zzzzzzz....") - Thread.sleep(10000) - dsInfo = fromMapReader.datasetInfo(fromDatasetId).getOrElse { - bail("Can't find dataset") + if(!ignoreReplication) { + while(!fullyReplicated(dsInfo.systemId, fromUniverse.secondaryManifest, dsInfo.latestDataVersion)) { + fromUniverse.rollback() + log.info("zzzzzzz....") + Thread.sleep(10000) + dsInfo = fromMapReader.datasetInfo(fromDatasetId).getOrElse { + bail("Can't find dataset") + } } } dsInfo @@ -400,7 +408,7 @@ class SingleMover(serviceConfig: MoverConfig, dryRun: Boolean, executorService: bail("Secondary is in an unsupported store!!! After we checked that it wasn't?!?!?") } - if(claimantId.isDefined) { + if(claimantId.isDefined && !ignoreReplication) { bail("Secondary manifest record is claimed?") } @@ -412,7 +420,7 @@ class SingleMover(serviceConfig: MoverConfig, dryRun: Boolean, executorService: bail("Dataset not actually up to date?") } - if(latestSecondaryDataVersion < fromDsInfo.latestDataVersion) { + if(latestSecondaryDataVersion < fromDsInfo.latestDataVersion && !ignoreReplication) { bail("Dataset not fully replicated after we checked that it was?") } From 4f7cbfb08e12963d904e9cbc38d0896754c5571f Mon Sep 17 00:00:00 2001 From: "Robert J. Macomber" Date: Thu, 15 Aug 2024 14:35:58 -0700 Subject: [PATCH 8/8] Add archival-url to readme --- dataset-mover/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dataset-mover/README.md b/dataset-mover/README.md index e050b5f5..04cedf93 100644 --- a/dataset-mover/README.md +++ b/dataset-mover/README.md @@ -58,6 +58,9 @@ com.socrata.coordinator.datasetmover { additional-acceptable-secondaries = [ "geocoding" ] + # This is optional, but without it datasets in the archival secondary can't be moved + archival-url = "http://archival-server.app.marathon.ENV.socrata.net/" + soda-fountain = ${common-database} { host = "XXXX" database = "XXXX"