diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala index e50c7f24..733add31 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala @@ -31,12 +31,16 @@ object SoQLSystemColumns { sc => val createdAt = new UserColumnId(":created_at") val updatedAt = new UserColumnId(":updated_at") val version = new UserColumnId(":version") - - val schemaFragment = UserColumnIdMap[MutatorColumnInfo[SoQLType]]( - id -> MutatorColInfo(SoQLID, id), - version -> MutatorColInfo(SoQLVersion, version), - createdAt -> MutatorColInfo(SoQLFixedTimestamp, createdAt), - updatedAt -> MutatorColInfo(SoQLFixedTimestamp, updatedAt) + val createdVersion = new UserColumnId(":created_dataset_version") + val updatedVersion = new UserColumnId(":updated_dataset_version") + + val schemaFragment = UserColumnIdMap[MutatorSystemColInfo]( + id -> MutatorSystemColInfo(SoQLID, id, optional = false), + version -> MutatorSystemColInfo(SoQLVersion, version, optional = false), + createdAt -> MutatorSystemColInfo(SoQLFixedTimestamp, createdAt, optional = false), + updatedAt -> MutatorSystemColInfo(SoQLFixedTimestamp, updatedAt, optional = false), + createdVersion -> MutatorSystemColInfo(SoQLNumber, createdVersion, optional = true), + updatedVersion -> MutatorSystemColInfo(SoQLNumber, updatedVersion, optional = true) ) val allSystemColumnIds = schemaFragment.keySet @@ -44,7 +48,7 @@ object SoQLSystemColumns { sc => def isSystemColumnId(name: UserColumnId): Boolean = name.underlying.startsWith(":") && !name.underlying.startsWith(":@") - case class MutatorColInfo(typ: SoQLType, id: UserColumnId) extends MutatorColumnInfo[SoQLType] { + case class MutatorSystemColInfo(typ: SoQLType, id: UserColumnId, optional: Boolean) extends MutatorColumnInfo[SoQLType] { def fieldName = Some(ColumnName(id.underlying)) def computationStrategy = None } @@ -164,21 +168,31 @@ class SoQLCommon(dataSource: DataSource, def findCol(name: UserColumnId) = schema.getOrElse(name, sys.error(s"No $name column?")).systemId + def findColOpt(name: UserColumnId) = + schema.get(name).map(_.systemId) + val idColumn = findCol(SystemColumns.id) val createdAtColumn = findCol(SystemColumns.createdAt) val updatedAtColumn = findCol(SystemColumns.updatedAt) val versionColumn = findCol(SystemColumns.version) + val createdVersionColumn = findColOpt(SystemColumns.createdVersion) + val updatedVersionColumn = findColOpt(SystemColumns.updatedVersion) val columnsRequiredForDelete = ColumnIdSet(versionColumn) val primaryKeyColumn = ctx.pkCol_! + val datasetVersion = SoQLNumber(new java.math.BigDecimal(ctx.copyInfo.dataVersion)) + assert(ctx.schema(versionColumn).typeName == typeContext.typeNamespace.nameForType(SoQLVersion)) val allSystemColumns = locally { val result = MutableColumnIdSet() - for(c <- SystemColumns.allSystemColumnIds) { - result += findCol(c) + for { + (c, desc) <- SystemColumns.schemaFragment.iterator + systemId <- if(desc.optional) findColOpt(c) else Some(findCol(c)) + } { + result += systemId } result.freeze() } @@ -189,6 +203,8 @@ class SoQLCommon(dataSource: DataSource, tmp(createdAtColumn) = SoQLFixedTimestamp(transactionStart) tmp(updatedAtColumn) = SoQLFixedTimestamp(transactionStart) tmp(versionColumn) = SoQLVersion(version.underlying) + createdVersionColumn.foreach { c => tmp(c) = datasetVersion } + updatedVersionColumn.foreach { c => tmp(c) = datasetVersion } tmp.freeze() } @@ -212,6 +228,7 @@ class SoQLCommon(dataSource: DataSource, } tmp(updatedAtColumn) = SoQLFixedTimestamp(transactionStart) tmp(versionColumn) = SoQLVersion(newVersion.underlying) + updatedVersionColumn.foreach { c => tmp(c) = datasetVersion } tmp.freeze() } } diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/soql/SystemColumns.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/soql/SystemColumns.scala index b653cc9f..1e456574 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/soql/SystemColumns.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/soql/SystemColumns.scala @@ -4,4 +4,6 @@ object SystemColumns { val id = ":id" val createdAt = ":created_at" val updatedAt = ":updated_at" + val createdVersion = ":created_dataset_version" + val updatedVersion = ":updated_dataset_version" } diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/DatabaseAccessors.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/DatabaseAccessors.scala index c8d9e8af..ba6f17f9 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/DatabaseAccessors.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/DatabaseAccessors.scala @@ -365,6 +365,9 @@ object DatasetMutator { val oldSchema = datasetMap.schema(oldCopy) val newSchema = datasetMap.schema(newCopy) + + // If we want to backfill system columns, this is where we'd do it. + schemaLoader.addColumns(newSchema.values) val oldCopyContext = new DatasetCopyContext(oldCopy, oldSchema) diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/loader/sql/RepBasedSqlDatasetContentsCopier.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/loader/sql/RepBasedSqlDatasetContentsCopier.scala index d7997336..c662fef4 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/loader/sql/RepBasedSqlDatasetContentsCopier.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/loader/sql/RepBasedSqlDatasetContentsCopier.scala @@ -13,9 +13,9 @@ class RepBasedSqlDatasetContentsCopier[CT, CV](conn: Connection, logger: Logger[ def copy(from: DatasetCopyContext[CT], to: DatasetCopyContext[CT]) { require(from.datasetInfo == to.datasetInfo, "Cannot copy across datasets") if(to.schema.nonEmpty) { - val toPhysCols = to.schema.values.flatMap(repFor(_).physColumns).mkString(",") + val toPhysCols = from.schema.keys.flatMap { k => to.schema.get(k) }.flatMap(repFor(_).physColumns).mkString(",") // Same columns in the same order... - val fromPhysCols = to.schema.keys.flatMap { cid => + val fromPhysCols = from.schema.keys.filter { k => to.schema.contains(k) }.flatMap { cid => repFor(from.schema(cid)).physColumns }.mkString(",")