Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ object SoQLSystemColumns { sc =>
val createdAt = new UserColumnId(":created_at")
val updatedAt = new UserColumnId(":updated_at")
val version = new UserColumnId(":version")

val schemaFragment = UserColumnIdMap[MutatorColumnInfo[SoQLType]](
id -> MutatorColInfo(SoQLID, id),
version -> MutatorColInfo(SoQLVersion, version),
createdAt -> MutatorColInfo(SoQLFixedTimestamp, createdAt),
updatedAt -> MutatorColInfo(SoQLFixedTimestamp, updatedAt)
val createdVersion = new UserColumnId(":created_dataset_version")
val updatedVersion = new UserColumnId(":updated_dataset_version")

val schemaFragment = UserColumnIdMap[MutatorSystemColInfo](
id -> MutatorSystemColInfo(SoQLID, id, optional = false),
version -> MutatorSystemColInfo(SoQLVersion, version, optional = false),
createdAt -> MutatorSystemColInfo(SoQLFixedTimestamp, createdAt, optional = false),
updatedAt -> MutatorSystemColInfo(SoQLFixedTimestamp, updatedAt, optional = false),
createdVersion -> MutatorSystemColInfo(SoQLNumber, createdVersion, optional = true),
updatedVersion -> MutatorSystemColInfo(SoQLNumber, updatedVersion, optional = true)
)

val allSystemColumnIds = schemaFragment.keySet

def isSystemColumnId(name: UserColumnId): Boolean =
name.underlying.startsWith(":") && !name.underlying.startsWith(":@")

case class MutatorColInfo(typ: SoQLType, id: UserColumnId) extends MutatorColumnInfo[SoQLType] {
case class MutatorSystemColInfo(typ: SoQLType, id: UserColumnId, optional: Boolean) extends MutatorColumnInfo[SoQLType] {
def fieldName = Some(ColumnName(id.underlying))
def computationStrategy = None
}
Expand Down Expand Up @@ -164,21 +168,31 @@ class SoQLCommon(dataSource: DataSource,
def findCol(name: UserColumnId) =
schema.getOrElse(name, sys.error(s"No $name column?")).systemId

def findColOpt(name: UserColumnId) =
schema.get(name).map(_.systemId)

val idColumn = findCol(SystemColumns.id)
val createdAtColumn = findCol(SystemColumns.createdAt)
val updatedAtColumn = findCol(SystemColumns.updatedAt)
val versionColumn = findCol(SystemColumns.version)
val createdVersionColumn = findColOpt(SystemColumns.createdVersion)
val updatedVersionColumn = findColOpt(SystemColumns.updatedVersion)

val columnsRequiredForDelete = ColumnIdSet(versionColumn)

val primaryKeyColumn = ctx.pkCol_!

val datasetVersion = SoQLNumber(new java.math.BigDecimal(ctx.copyInfo.dataVersion))

assert(ctx.schema(versionColumn).typeName == typeContext.typeNamespace.nameForType(SoQLVersion))

val allSystemColumns = locally {
val result = MutableColumnIdSet()
for(c <- SystemColumns.allSystemColumnIds) {
result += findCol(c)
for {
(c, desc) <- SystemColumns.schemaFragment.iterator
systemId <- if(desc.optional) findColOpt(c) else Some(findCol(c))
} {
result += systemId
}
result.freeze()
}
Expand All @@ -189,6 +203,8 @@ class SoQLCommon(dataSource: DataSource,
tmp(createdAtColumn) = SoQLFixedTimestamp(transactionStart)
tmp(updatedAtColumn) = SoQLFixedTimestamp(transactionStart)
tmp(versionColumn) = SoQLVersion(version.underlying)
createdVersionColumn.foreach { c => tmp(c) = datasetVersion }
updatedVersionColumn.foreach { c => tmp(c) = datasetVersion }
tmp.freeze()
}

Expand All @@ -212,6 +228,7 @@ class SoQLCommon(dataSource: DataSource,
}
tmp(updatedAtColumn) = SoQLFixedTimestamp(transactionStart)
tmp(versionColumn) = SoQLVersion(newVersion.underlying)
updatedVersionColumn.foreach { c => tmp(c) = datasetVersion }
tmp.freeze()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ object SystemColumns {
val id = ":id"
val createdAt = ":created_at"
val updatedAt = ":updated_at"
val createdVersion = ":created_dataset_version"
val updatedVersion = ":updated_dataset_version"
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ object DatasetMutator {

val oldSchema = datasetMap.schema(oldCopy)
val newSchema = datasetMap.schema(newCopy)

// If we want to backfill system columns, this is where we'd do it.

schemaLoader.addColumns(newSchema.values)

val oldCopyContext = new DatasetCopyContext(oldCopy, oldSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ class RepBasedSqlDatasetContentsCopier[CT, CV](conn: Connection, logger: Logger[
def copy(from: DatasetCopyContext[CT], to: DatasetCopyContext[CT]) {
require(from.datasetInfo == to.datasetInfo, "Cannot copy across datasets")
if(to.schema.nonEmpty) {
val toPhysCols = to.schema.values.flatMap(repFor(_).physColumns).mkString(",")
val toPhysCols = from.schema.keys.flatMap { k => to.schema.get(k) }.flatMap(repFor(_).physColumns).mkString(",")
// Same columns in the same order...
val fromPhysCols = to.schema.keys.flatMap { cid =>
val fromPhysCols = from.schema.keys.filter { k => to.schema.contains(k) }.flatMap { cid =>
repFor(from.schema(cid)).physColumns
}.mkString(",")

Expand Down