diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala index e50c7f24..68138f39 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala @@ -115,15 +115,17 @@ class SoQLCommon(dataSource: DataSource, def isSystemColumnId(name: UserColumnId): Boolean = SoQLSystemColumns.isSystemColumnId(name) - val universe: Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] = new Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] { - def run[B](f: PostgresUniverse[CT, CV] with SchemaFinderProvider => B): B = { + private class Universe(conn: Connection) extends PostgresUniverse[CT, CV](conn, PostgresUniverseCommon) with SchemaFinderProvider { + lazy val cache: Cache = common.cache + lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache) + } + + val universe: Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] = new Managed[Universe] { + def run[B](f: Universe => B): B = { val conn = dataSource.getConnection() try { conn.setAutoCommit(false) - val u = new PostgresUniverse(conn, PostgresUniverseCommon) with SchemaFinderProvider { - lazy val cache: Cache = common.cache - lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache) - } + val u = new Universe(conn) try { val result = f(u) u.commit() @@ -137,6 +139,25 @@ class SoQLCommon(dataSource: DataSource, } } + // Note this has slightly different semantics from `universe` - in + // particular, normal non-lexical returns that close a resourceScope + // are considered a "normal" return, and hence the transaction will + // be _committed_, whereas the `Managed` block returned from + // `universe` treats them as abnormal and hence roll back. + def scopedUniverse(rs: ResourceScope): PostgresUniverse[CT, CV] with SchemaFinderProvider = { + val conn = rs.open(dataSource.getConnection()) + conn.setAutoCommit(false) + rs.open(new Universe(conn), transitiveClose = List(conn))( + new Resource[Universe] { + override def close(u: Universe) = + u.commit() + override def closeAbnormally(u: Universe, e: Throwable) { + u.rollback() + } + } + ) + } + object PostgresUniverseCommon extends PostgresCommonSupport[SoQLType, SoQLValue] { val typeContext = common.typeContext diff --git a/dataset-mover/README.md b/dataset-mover/README.md index e050b5f5..04cedf93 100644 --- a/dataset-mover/README.md +++ b/dataset-mover/README.md @@ -58,6 +58,9 @@ com.socrata.coordinator.datasetmover { additional-acceptable-secondaries = [ "geocoding" ] + # This is optional, but without it datasets in the archival secondary can't be moved + archival-url = "http://archival-server.app.marathon.ENV.socrata.net/" + soda-fountain = ${common-database} { host = "XXXX" database = "XXXX" diff --git a/dataset-mover/build.sbt b/dataset-mover/build.sbt index e695c2dc..e90b4f68 100644 --- a/dataset-mover/build.sbt +++ b/dataset-mover/build.sbt @@ -3,6 +3,7 @@ import Dependencies._ name := "dataset-mover" libraryDependencies ++= Seq( + "org.xerial" % "sqlite-jdbc" % "3.46.0.0" ) assembly/test := {} diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala new file mode 100644 index 00000000..20f387df --- /dev/null +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/DoManagedMoves.scala @@ -0,0 +1,124 @@ +package com.socrata.datacoordinator.mover + +import scala.util.control.Breaks + +import java.io.{File, FileInputStream, InputStreamReader, BufferedReader} +import java.nio.charset.StandardCharsets +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.Executors +import java.sql.DriverManager +import sun.misc.{Signal, SignalHandler} + +import com.rojoma.simplearm.v2._ +import com.typesafe.config.ConfigFactory +import org.apache.log4j.PropertyConfigurator +import org.slf4j.LoggerFactory + +import com.socrata.http.client.HttpClientHttpClient +import com.socrata.thirdparty.typesafeconfig.Propertizer + +case class DoManagedMoves(serviceConfig: MoverConfig, dryRun: Boolean, ignoreReplication: Boolean, fromInstance: String, toInstance: String, trackerFile: String, systemIdListFile: String, parallelismRaw: String) { + val parallelism = parallelismRaw.toInt + + val log = LoggerFactory.getLogger(classOf[DoManagedMoves]) + + val SIGTERM = new Signal("TERM") + val SIGINT = new Signal("INT") + val shutdownSignalled = new AtomicBoolean(false) + val shutdownSignalHandler = new SignalHandler { + def handle(signal: Signal) { + shutdownSignalled.set(true) + } + } + + var oldSIGTERM: SignalHandler = null + var oldSIGINT: SignalHandler = null + + try { + oldSIGTERM = Signal.handle(SIGTERM, shutdownSignalHandler) + oldSIGINT = Signal.handle(SIGINT, shutdownSignalHandler) + + using(new ResourceScope) { rs => + implicit val executorShutdown = Resource.executorShutdownNoTimeout + val executorService = rs.open(Executors.newCachedThreadPool) + val httpClient = rs.open(new HttpClientHttpClient(executorService)) + + val singleMover = new SingleMover(serviceConfig, dryRun, ignoreReplication, executorService, httpClient) + + val conn = DriverManager.getConnection("jdbc:sqlite:" + (if(dryRun) ":memory:" else trackerFile)) + conn.setAutoCommit(true) + + using(conn.createStatement()) { stmt => + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS progress (system_id integer not null primary key, finished_at text null, finished_successfully boolean null)") + + val failure_reason_exists = + using(stmt.executeQuery("select * from pragma_table_info('progress') where name = 'failure_reason'")) { rs => + rs.next() + } + if(!failure_reason_exists) { + stmt.executeUpdate("ALTER TABLE progress ADD COLUMN failure_reason TEXT NULL"); + } + } + + val file = locally { + val fis = rs.open(new FileInputStream(systemIdListFile)) + val isr = rs.open(new InputStreamReader(fis, StandardCharsets.UTF_8), transitiveClose = List(fis)) + val br = rs.open(new BufferedReader(isr), transitiveClose = List(isr)) + rs.openUnmanaged(new LinesIterator(br), transitiveClose = List(br)) + } + + def finish(job: Workers.CompletedJob): Unit = { + job match { + case Workers.SuccessfulJob(systemId, finishedAt) => + using(conn.prepareStatement("UPDATE progress SET finished_at = ?, finished_successfully = ? WHERE system_id = ?")) { stmt => + stmt.setString(1, job.finishedAt.toString) + stmt.setBoolean(2, true) + stmt.setLong(3, job.systemId) + stmt.executeUpdate() + } + case Workers.FailedJob(systemId, finishedAt, failureReason) => + using(conn.prepareStatement("UPDATE progress SET finished_at = ?, finished_successfully = ?, failure_reason = ? WHERE system_id = ?")) { stmt => + stmt.setString(1, job.finishedAt.toString) + stmt.setBoolean(2, false) + stmt.setString(3, failureReason) + stmt.setLong(4, job.systemId) + stmt.executeUpdate() + } + } + } + + val workers = new Workers(parallelism, singleMover) + + val break = new Breaks + break.breakable { + for(line <- file) { + if(shutdownSignalled.get) { + log.info("Stopping because of signal") + break.break() + } + + val id = line.toLong + + val inserted = using(conn.prepareStatement("INSERT INTO progress (system_id) VALUES (?) ON CONFLICT(system_id) DO NOTHING")) { stmt => + stmt.setLong(1, id) + stmt.executeUpdate() != 0 + } + + if(inserted) { + for(previousJob <- workers.submit(fromInstance, toInstance, id)) { + finish(previousJob) + } + } + } + } + + for(job <- workers.shutdown()) { + finish(job) + } + } + } finally { + if(oldSIGINT != null) Signal.handle(SIGINT, oldSIGINT) + if(oldSIGTERM != null) Signal.handle(SIGTERM, oldSIGTERM) + } +} diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/LinesIterator.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/LinesIterator.scala new file mode 100644 index 00000000..c5ef2d38 --- /dev/null +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/LinesIterator.scala @@ -0,0 +1,70 @@ +package com.socrata.datacoordinator.mover + +import scala.annotation.tailrec + +import java.io.BufferedReader + +class LinesIterator(reader: BufferedReader, keepEndOfLine: Boolean = false) extends Iterator[String] with BufferedIterator[String] { + private var pending: String = null + private var done = false + + def hasNext = pending != null || advance() + + def head = + if(hasNext) pending + else Iterator.empty.next() + + def next(): String = + if(hasNext) { + val result = pending + pending = null + result + } else { + Iterator.empty.next() + } + + private def advance(): Boolean = + if(done) { + false + } else { + pending = + if(keepEndOfLine) readIncludingEOL() + else reader.readLine() + done = pending == null + !done + } + + private def readIncludingEOL(): String = { + val sb = new StringBuilder + + @tailrec + def loop(): Unit = { + reader.read() match { + case -1 => + // done + case 10 => // \n + sb.append('\n') + case 13 => // \r + reader.mark(1) + reader.read() match { + case 10 => + sb.append("\r\n") + case _ => + sb.append('\r') + reader.reset() + } + case other => + sb.append(other.toChar) + loop() + } + } + + loop() + + if(sb.nonEmpty) { + sb.toString() + } else { + null + } + } +} diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala index ef4bb7b0..5cdf9c16 100644 --- a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Main.scala @@ -6,7 +6,8 @@ import scala.concurrent.duration._ import java.io.File import java.net.URL import java.sql.Connection -import java.util.concurrent.Executors +import java.time.Instant +import java.util.concurrent.{Executors, ExecutorService} import com.rojoma.simplearm.v2._ import com.typesafe.config.ConfigFactory @@ -14,7 +15,7 @@ import org.apache.log4j.PropertyConfigurator import org.postgresql.PGConnection import org.postgresql.copy.{CopyIn,CopyOut} -import com.socrata.http.client.HttpClientHttpClient +import com.socrata.http.client.{HttpClient, HttpClientHttpClient} import com.socrata.soql.types.SoQLType import com.socrata.thirdparty.typesafeconfig.Propertizer @@ -29,53 +30,87 @@ import com.socrata.datacoordinator.truth.metadata.LifecycleStage sealed abstract class Main object Main extends App { - if(args.length != 2) { - System.err.println("Usage: dataset-mover.jar INTERNAL_NAME TARGET_TRUTH") - System.err.println() - System.err.println(" INTERNAL_NAME internal name (e.g., alpha.1234) of the dataset to move") - System.err.println(" TARGET_TRUTH truthstore in which to move it (e.g., bravo)") - System.err.println() - System.err.println("Unless the SOCRATA_COMMIT_MOVE environment variable is set, all changes") - System.err.println("will be rolled back rather than committed.") - sys.exit(1) - } + val dryRun = sys.env.get("SOCRATA_COMMIT_MOVE").isEmpty + + val ignoreReplication = sys.env.get("SOCRATA_IGNORE_REPLICATION").isDefined + + def setup(): MoverConfig = { + val serviceConfig = try { + new MoverConfig(ConfigFactory.load(), "com.socrata.coordinator.datasetmover") + } catch { + case e: Exception => + Console.err.println(e) + sys.exit(1) + } - val fromInternalName = DatasetInternalName(args(0)).getOrElse { - System.err.println("Illegal dataset internal name") - sys.exit(1) + PropertyConfigurator.configure(Propertizer("log4j", serviceConfig.logProperties)) + + serviceConfig } - val fromDatasetId = fromInternalName.datasetId - val toInstance = args(1) + args match { + case Array(internalNameRaw, targetTruthRaw) => + DoSingleMove(setup(), dryRun, ignoreReplication, internalNameRaw, targetTruthRaw) + case Array(sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) => + DoManagedMoves(setup(), dryRun, ignoreReplication, sourceTruthRaw, targetTruthRaw, trackerFile, systemIdListFile, parallelism) + case _ => + System.err.println("Usage:") + System.err.println(" dataset-mover.jar INTERNAL_NAME TARGET_TRUTH") + System.err.println(" dataset-mover.jar SOURCE_TRUTH TARGET_TRUTH TRACKER_FILE SYSTEM_ID_LIST_FILE PARALLELISM") + System.err.println() + System.err.println(" INTERNAL_NAME internal name (e.g., alpha.1234) of the dataset to move") + System.err.println(" TARGET_TRUTH truthstore in which to move it (e.g., bravo)") + System.err.println() + System.err.println("Unless the SOCRATA_COMMIT_MOVE environment variable is set, all changes") + System.err.println("will be rolled back rather than committed.") + System.err.println() + System.err.println("If the SOCRATA_IGNORE_REPLICATION environment variable is set, the mover") + System.err.println("will NOT wait for replication to complete before moving the dataset. This") + System.err.println("option is dangerous; it may require a resync after the move completes.") + sys.exit(1) + } +} - val dryRun = sys.env.get("SOCRATA_COMMIT_MOVE").isEmpty +case class DoSingleMove(serviceConfig: MoverConfig, dryRun: Boolean, ignoreReplication: Boolean, fromInternalNameRaw: String, toInstance: String) { + try { + val fromInternalName = DatasetInternalName(fromInternalNameRaw).getOrElse { + System.err.println("Illegal dataset internal name") + sys.exit(1) + } - val serviceConfig = try { - new MoverConfig(ConfigFactory.load(), "com.socrata.coordinator.datasetmover") + using(new ResourceScope) { rs => + implicit val executorShutdown = Resource.executorShutdownNoTimeout + val executorService = rs.open(Executors.newCachedThreadPool) + val httpClient = rs.open(new HttpClientHttpClient(executorService)) + new SingleMover(serviceConfig, dryRun, ignoreReplication, executorService, httpClient).apply(fromInternalName, toInstance) + } } catch { - case e: Exception => - Console.err.println(e) + case e: SingleMover.Bail => + System.err.println(e.getMessage) sys.exit(1) } +} - PropertyConfigurator.configure(Propertizer("log4j", serviceConfig.logProperties)) +object SingleMover { + val log = org.slf4j.LoggerFactory.getLogger(classOf[SingleMover]) + case class Bail(msg: String) extends Exception(msg) + def bail(msg: String): Nothing = throw new Bail(msg) +} - val log = org.slf4j.LoggerFactory.getLogger(classOf[Main]) +class SingleMover(serviceConfig: MoverConfig, dryRun: Boolean, ignoreReplication: Boolean, executorService: ExecutorService, httpClient: HttpClient) extends ((DatasetInternalName, String) => Unit) { + import SingleMover._ - implicit val executorShutdown = Resource.executorShutdownNoTimeout + if(serviceConfig.truths.values.exists(_.poolOptions.isDefined)) { + bail("truths must not be c3p0 data sources") + } def fullyReplicated(datasetId: DatasetId, manifest: SecondaryManifest, targetVersion: Long): Boolean = { - manifest.stores(datasetId).values.forall{ case (version: Long, _) => version == targetVersion } + manifest.stores(datasetId).values.forall{ case (version: Long, _) => version >= targetVersion } } def isPgSecondary(store: String): Boolean = serviceConfig.pgSecondaries.contains(store) - if(serviceConfig.truths.values.exists(_.poolOptions.isDefined)) { - System.err.println("truths must not be c3p0 data sources") - sys.exit(1) - } - def doCopy(fromConn: Connection, copyOutSql: String, toConn: Connection, copyInSql: String) { log.info("Copy from: {}", copyOutSql) log.info("Copy to: {}", copyInSql) @@ -137,13 +172,12 @@ object Main extends App { serviceConfig.archivalUrl.map(_ => "archival") ++ serviceConfig.additionalAcceptableSecondaries - class Bail(msg: String) extends Exception(msg) - def bail(msg: String): Nothing = throw new Bail(msg) + def apply(fromInternalName: DatasetInternalName, toInstance: String): Unit = { + log.info("Moving {} to {}", fromInternalName: Any, toInstance) + + val fromDatasetId = fromInternalName.datasetId - try { using(new ResourceScope) { rs => - val executorService = rs.open(Executors.newCachedThreadPool) - val httpClient = rs.open(new HttpClientHttpClient(executorService)) val tmpDir = ResourceUtil.Temporary.Directory.scoped[File](rs) val truths = serviceConfig.truths.iterator.map { case (k, v) => @@ -162,35 +196,29 @@ object Main extends App { val fromDsInfo = truths(fromInternalName.instance) val toDsInfo = truths(toInstance) - val fromCommon = new SoQLCommon( - fromDsInfo.dataSource, - fromDsInfo.copyIn, - executorService, - ServiceMain.tablespaceFinder(serviceConfig.tablespace), - NoopTimingReport, - allowDdlOnPublishedCopies = true, - serviceConfig.writeLockTimeout, - fromInternalName.instance, - tmpDir, - 10000.days, - 10000.days, - NullCache - ) - - val toCommon = new SoQLCommon( - toDsInfo.dataSource, - toDsInfo.copyIn, - executorService, - ServiceMain.tablespaceFinder(serviceConfig.tablespace), - NoopTimingReport, - allowDdlOnPublishedCopies = true, - serviceConfig.writeLockTimeout, - toInstance, - tmpDir, - 10000.days, - 10000.days, - NullCache - ) + val commons = truths.iterator.map { case (instance, dsInfo) => + instance -> new SoQLCommon( + dsInfo.dataSource, + dsInfo.copyIn, + executorService, + ServiceMain.tablespaceFinder(serviceConfig.tablespace), + NoopTimingReport, + allowDdlOnPublishedCopies = true, + serviceConfig.writeLockTimeout, + instance, + tmpDir, + 10000.days, + 10000.days, + NullCache + ) + }.toMap + + val fromCommon = commons(fromInternalName.instance) + val toCommon = commons(toInstance) + + val otherTruthUniverses = (commons -- Seq(fromInternalName.instance, toInstance)).iterator.map { case (instance, common) => + instance -> common.scopedUniverse(rs) + }.toMap for { fromLockUniverse <- fromCommon.universe @@ -206,21 +234,20 @@ object Main extends App { var dsInfo = fromMapReader.datasetInfo(fromDatasetId).getOrElse { bail("Can't find dataset") } - while(!fullyReplicated(dsInfo.systemId, fromUniverse.secondaryManifest, dsInfo.latestDataVersion)) { - fromUniverse.rollback() - log.info("zzzzzzz....") - Thread.sleep(10000) - dsInfo = fromMapReader.datasetInfo(fromDatasetId).getOrElse { - bail("Can't find dataset") + if(!ignoreReplication) { + while(!fullyReplicated(dsInfo.systemId, fromUniverse.secondaryManifest, dsInfo.latestDataVersion)) { + fromUniverse.rollback() + log.info("zzzzzzz....") + Thread.sleep(10000) + dsInfo = fromMapReader.datasetInfo(fromDatasetId).getOrElse { + bail("Can't find dataset") + } } } dsInfo } val stores = fromUniverse.secondaryManifest.stores(fromDsInfo.systemId).keySet - if(stores.isEmpty) { - bail("Refusing to move dataset that lives in no stores") - } val invalidSecondaries = stores -- acceptableSecondaries if(invalidSecondaries.nonEmpty) { bail("Refusing to move dataset that lives in " + invalidSecondaries) @@ -381,7 +408,7 @@ object Main extends App { bail("Secondary is in an unsupported store!!! After we checked that it wasn't?!?!?") } - if(claimantId.isDefined) { + if(claimantId.isDefined && !ignoreReplication) { bail("Secondary manifest record is claimed?") } @@ -393,7 +420,7 @@ object Main extends App { bail("Dataset not actually up to date?") } - if(latestSecondaryDataVersion != fromDsInfo.latestDataVersion) { + if(latestSecondaryDataVersion < fromDsInfo.latestDataVersion && !ignoreReplication) { bail("Dataset not fully replicated after we checked that it was?") } @@ -410,6 +437,55 @@ object Main extends App { } } + log.info("Copying collocation_manifest records to the new truth...") + // Any secondary_manifest records held in _this_ truth need to + // be put in the other truth and have their reference changed + // from the old internal name to the new internal name. + for { + fromStmt <- managed(fromUniverse.unsafeRawConnection.prepareStatement("select dataset_internal_name_left, dataset_internal_name_right, created_at, updated_at from collocation_manifest where dataset_internal_name_left = ? or dataset_internal_name_right = ?")) + .and { stmt => + stmt.setString(1, fromInternalName.underlying) + stmt.setString(2, fromInternalName.underlying) + } + fromResults <- managed(fromStmt.executeQuery()) + toStmt <- managed(toUniverse.unsafeRawConnection.prepareStatement("insert into collocation_manifest(dataset_internal_name_left, dataset_internal_name_right, created_at, updated_at) values (?, ?, ?, ?)")) + } { + def switchName(s: String) = + if(s == fromInternalName.underlying) toInternalName.underlying else s + + while(fromResults.next()) { + log.info(" - {}/{} => {}/{}", fromResults.getString(1), fromResults.getString(2), switchName(fromResults.getString(1)), switchName(fromResults.getString(2))) + toStmt.setString(1, switchName(fromResults.getString(1))) + toStmt.setString(2, switchName(fromResults.getString(2))) + toStmt.setTimestamp(3, fromResults.getTimestamp(3)) + toStmt.setTimestamp(4, fromResults.getTimestamp(4)) + toStmt.executeUpdate() + } + } + // and for all other truths, we want to simply add a new + // record to this truth's manifest table to say "if you were + // collocated with old-dataset before, now you're also + // collocated with new-dataset" + for((truth, universe) <- otherTruthUniverses ++ Map(toInternalName.instance -> toUniverse)) { + val count1 = + managed(universe.unsafeRawConnection.prepareStatement("insert into collocation_manifest (dataset_internal_name_left, dataset_internal_name_right, created_at, updated_at) select ?, dataset_internal_name_right, created_at, updated_at from collocation_manifest where dataset_internal_name_left = ?")) + .and { stmt => + stmt.setString(1, toInternalName.underlying) + stmt.setString(2, fromInternalName.underlying) + } + .run(_.executeUpdate()) + log.info(" - Created {} records on {} where the old name was left", count1, truth) + + val count2 = + managed(universe.unsafeRawConnection.prepareStatement("insert into collocation_manifest (dataset_internal_name_left, dataset_internal_name_right, created_at, updated_at) select dataset_internal_name_left, ?, created_at, updated_at from collocation_manifest where dataset_internal_name_right = ?")) + .and { stmt => + stmt.setString(1, toInternalName.underlying) + stmt.setString(2, fromInternalName.underlying) + } + .run(_.executeUpdate()) + log.info(" - Created {} records on {} where the old name was right", count2, truth) + } + log.info("Adding the new name to the PG secondary stores...") for(store <- stores if isPgSecondary(store)) { for { @@ -500,17 +576,34 @@ object Main extends App { } // And now we commit the change into the target truth... + log.info("Committing target truth") try { if(dryRun) toUniverse.rollback() else toUniverse.commit() } catch { case e: Exception => + rollbackPGSecondary() rollbackArchivalSecondary() throw e } fromUniverse.rollback() + log.info("Committing collocation_manifest changes to other truths") + // Same comment as a above re: what happens if this fails + try { + for((instance, universe) <- otherTruthUniverses) { + log.info("..{}", instance) + if(dryRun) universe.rollback() + else universe.commit() + } + } catch { + case e: Exception => + rollbackPGSecondary() + rollbackArchivalSecondary() + throw e + } + log.info("Informing soda-fountain of the internal name change...") // Same comment as a above re: what happens if this fails try { @@ -605,9 +698,5 @@ object Main extends App { println("Moved " + fromInternalName + " to " + toInternalName) } } - } catch { - case e: Bail => - System.err.println(e.getMessage) - sys.exit(1) } } diff --git a/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala new file mode 100644 index 00000000..bd15c548 --- /dev/null +++ b/dataset-mover/src/main/scala/com/socrata/datacoordinator/mover/Workers.scala @@ -0,0 +1,69 @@ +package com.socrata.datacoordinator.mover + +import java.time.Instant +import java.util.concurrent.ArrayBlockingQueue + +import com.rojoma.simplearm.v2._ +import org.slf4j.LoggerFactory + +import com.socrata.datacoordinator.id.DatasetInternalName + +class Workers(parallelism: Int, singleMover: (DatasetInternalName, String) => Unit) { + import Workers._ + + private val stdoutMutex = new Object + private var running = 0 + private val queue = new ArrayBlockingQueue[CompletedJob](parallelism) + + def submit(fromInstance: String, toInstance: String, systemId: Long): Option[CompletedJob] = { + val result = + if(running == parallelism) { + val completed = queue.take() + running -= 1 + Some(completed) + } else { + None + } + + val thread = new Thread() { + setName(s"${fromInstance}.${systemId} -> ${toInstance}") + override def run() { + try { + runJob(fromInstance, toInstance, systemId) + } catch { + case e: Exception => + log.error("Unexpected exception running job for {}.{}", fromInstance, systemId.asInstanceOf[AnyRef], e) + queue.add(FailedJob(systemId, Instant.now(), s"Unexpected exception: $e")) + } + } + } + thread.start() + running += 1 + + result + } + + def shutdown(): Iterator[Workers.CompletedJob] = + (0 until running).iterator.map { _ => queue.take() } + + private def runJob(fromInstance: String, toInstance: String, systemId: Long): Unit = { + try { + singleMover(DatasetInternalName(s"${fromInstance}.${systemId}").getOrElse(SingleMover.bail("Invalid dataset internal name")), toInstance) + } catch { + case SingleMover.Bail(msg) => + queue.add(FailedJob(systemId, Instant.now(), msg)) + return + } + queue.add(SuccessfulJob(systemId, Instant.now())) + } +} + +object Workers { + val log = LoggerFactory.getLogger(classOf[Workers]) + sealed abstract class CompletedJob { + val systemId: Long + val finishedAt: Instant + } + case class SuccessfulJob(systemId: Long, finishedAt: Instant) extends CompletedJob + case class FailedJob(systemId: Long, finishedAt: Instant, failureReason: String) extends CompletedJob +}