Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,17 @@ class SoQLCommon(dataSource: DataSource,
def isSystemColumnId(name: UserColumnId): Boolean =
SoQLSystemColumns.isSystemColumnId(name)

val universe: Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] = new Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] {
def run[B](f: PostgresUniverse[CT, CV] with SchemaFinderProvider => B): B = {
private class Universe(conn: Connection) extends PostgresUniverse[CT, CV](conn, PostgresUniverseCommon) with SchemaFinderProvider {
lazy val cache: Cache = common.cache
lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache)
}

val universe: Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] = new Managed[Universe] {
def run[B](f: Universe => B): B = {
val conn = dataSource.getConnection()
try {
conn.setAutoCommit(false)
val u = new PostgresUniverse(conn, PostgresUniverseCommon) with SchemaFinderProvider {
lazy val cache: Cache = common.cache
lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache)
}
val u = new Universe(conn)
try {
val result = f(u)
u.commit()
Expand All @@ -137,6 +139,25 @@ class SoQLCommon(dataSource: DataSource,
}
}

// Note this has slightly different semantics from `universe` - in
// particular, normal non-lexical returns that close a resourceScope
// are considered a "normal" return, and hence the transaction will
// be _committed_, whereas the `Managed` block returned from
// `universe` treats them as abnormal and hence roll back.
def scopedUniverse(rs: ResourceScope): PostgresUniverse[CT, CV] with SchemaFinderProvider = {
val conn = rs.open(dataSource.getConnection())
conn.setAutoCommit(false)
rs.open(new Universe(conn), transitiveClose = List(conn))(
new Resource[Universe] {
override def close(u: Universe) =
u.commit()
override def closeAbnormally(u: Universe, e: Throwable) {
u.rollback()
}
}
)
}

object PostgresUniverseCommon extends PostgresCommonSupport[SoQLType, SoQLValue] {
val typeContext = common.typeContext

Expand Down
3 changes: 3 additions & 0 deletions dataset-mover/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ com.socrata.coordinator.datasetmover {

additional-acceptable-secondaries = [ "geocoding" ]

# This is optional, but without it datasets in the archival secondary can't be moved
archival-url = "http://archival-server.app.marathon.ENV.socrata.net/"

soda-fountain = ${common-database} {
host = "XXXX"
database = "XXXX"
Expand Down
1 change: 1 addition & 0 deletions dataset-mover/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Dependencies._
name := "dataset-mover"

libraryDependencies ++= Seq(
"org.xerial" % "sqlite-jdbc" % "3.46.0.0"
)

assembly/test := {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.socrata.datacoordinator.mover

import scala.util.control.Breaks

import java.io.{File, FileInputStream, InputStreamReader, BufferedReader}
import java.nio.charset.StandardCharsets
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.Executors
import java.sql.DriverManager
import sun.misc.{Signal, SignalHandler}

import com.rojoma.simplearm.v2._
import com.typesafe.config.ConfigFactory
import org.apache.log4j.PropertyConfigurator
import org.slf4j.LoggerFactory

import com.socrata.http.client.HttpClientHttpClient
import com.socrata.thirdparty.typesafeconfig.Propertizer

case class DoManagedMoves(serviceConfig: MoverConfig, dryRun: Boolean, ignoreReplication: Boolean, fromInstance: String, toInstance: String, trackerFile: String, systemIdListFile: String, parallelismRaw: String) {
val parallelism = parallelismRaw.toInt

val log = LoggerFactory.getLogger(classOf[DoManagedMoves])

val SIGTERM = new Signal("TERM")
val SIGINT = new Signal("INT")
val shutdownSignalled = new AtomicBoolean(false)
val shutdownSignalHandler = new SignalHandler {
def handle(signal: Signal) {
shutdownSignalled.set(true)
}
}

var oldSIGTERM: SignalHandler = null
var oldSIGINT: SignalHandler = null

try {
oldSIGTERM = Signal.handle(SIGTERM, shutdownSignalHandler)
oldSIGINT = Signal.handle(SIGINT, shutdownSignalHandler)

using(new ResourceScope) { rs =>
implicit val executorShutdown = Resource.executorShutdownNoTimeout
val executorService = rs.open(Executors.newCachedThreadPool)
val httpClient = rs.open(new HttpClientHttpClient(executorService))

val singleMover = new SingleMover(serviceConfig, dryRun, ignoreReplication, executorService, httpClient)

val conn = DriverManager.getConnection("jdbc:sqlite:" + (if(dryRun) ":memory:" else trackerFile))
conn.setAutoCommit(true)

using(conn.createStatement()) { stmt =>
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS progress (system_id integer not null primary key, finished_at text null, finished_successfully boolean null)")

val failure_reason_exists =
using(stmt.executeQuery("select * from pragma_table_info('progress') where name = 'failure_reason'")) { rs =>
rs.next()
}
if(!failure_reason_exists) {
stmt.executeUpdate("ALTER TABLE progress ADD COLUMN failure_reason TEXT NULL");
}
}

val file = locally {
val fis = rs.open(new FileInputStream(systemIdListFile))
val isr = rs.open(new InputStreamReader(fis, StandardCharsets.UTF_8), transitiveClose = List(fis))
val br = rs.open(new BufferedReader(isr), transitiveClose = List(isr))
rs.openUnmanaged(new LinesIterator(br), transitiveClose = List(br))
}

def finish(job: Workers.CompletedJob): Unit = {
job match {
case Workers.SuccessfulJob(systemId, finishedAt) =>
using(conn.prepareStatement("UPDATE progress SET finished_at = ?, finished_successfully = ? WHERE system_id = ?")) { stmt =>
stmt.setString(1, job.finishedAt.toString)
stmt.setBoolean(2, true)
stmt.setLong(3, job.systemId)
stmt.executeUpdate()
}
case Workers.FailedJob(systemId, finishedAt, failureReason) =>
using(conn.prepareStatement("UPDATE progress SET finished_at = ?, finished_successfully = ?, failure_reason = ? WHERE system_id = ?")) { stmt =>
stmt.setString(1, job.finishedAt.toString)
stmt.setBoolean(2, false)
stmt.setString(3, failureReason)
stmt.setLong(4, job.systemId)
stmt.executeUpdate()
}
}
}

val workers = new Workers(parallelism, singleMover)

val break = new Breaks
break.breakable {
for(line <- file) {
if(shutdownSignalled.get) {
log.info("Stopping because of signal")
break.break()
}

val id = line.toLong

val inserted = using(conn.prepareStatement("INSERT INTO progress (system_id) VALUES (?) ON CONFLICT(system_id) DO NOTHING")) { stmt =>
stmt.setLong(1, id)
stmt.executeUpdate() != 0
}

if(inserted) {
for(previousJob <- workers.submit(fromInstance, toInstance, id)) {
finish(previousJob)
}
}
}
}

for(job <- workers.shutdown()) {
finish(job)
}
}
} finally {
if(oldSIGINT != null) Signal.handle(SIGINT, oldSIGINT)
if(oldSIGTERM != null) Signal.handle(SIGTERM, oldSIGTERM)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.socrata.datacoordinator.mover

import scala.annotation.tailrec

import java.io.BufferedReader

class LinesIterator(reader: BufferedReader, keepEndOfLine: Boolean = false) extends Iterator[String] with BufferedIterator[String] {
private var pending: String = null
private var done = false

def hasNext = pending != null || advance()

def head =
if(hasNext) pending
else Iterator.empty.next()

def next(): String =
if(hasNext) {
val result = pending
pending = null
result
} else {
Iterator.empty.next()
}

private def advance(): Boolean =
if(done) {
false
} else {
pending =
if(keepEndOfLine) readIncludingEOL()
else reader.readLine()
done = pending == null
!done
}

private def readIncludingEOL(): String = {
val sb = new StringBuilder

@tailrec
def loop(): Unit = {
reader.read() match {
case -1 =>
// done
case 10 => // \n
sb.append('\n')
case 13 => // \r
reader.mark(1)
reader.read() match {
case 10 =>
sb.append("\r\n")
case _ =>
sb.append('\r')
reader.reset()
}
case other =>
sb.append(other.toChar)
loop()
}
}

loop()

if(sb.nonEmpty) {
sb.toString()
} else {
null
}
}
}
Loading