Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 158 additions & 14 deletions catalog/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"iter"
"strconv"
"strings"
_ "unsafe"

"github.com/apache/iceberg-go"
Expand Down Expand Up @@ -50,20 +51,25 @@ const (
locationPropsKey = "Location"

// Table metadata location pointer.
metadataLocationPropsKey = "metadata_location"
metadataLocationPropsKey = "metadata_location"
previousMetadataLocationPropsKey = "previous_metadata_location"

// The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue
// automatically uses the caller's AWS account ID by default.
// See: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
CatalogIdKey = "glue.id"

AccessKeyID = "glue.access-key-id"
SecretAccessKey = "glue.secret-access-key"
SessionToken = "glue.session-token"
Region = "glue.region"
Endpoint = "glue.endpoint"
MaxRetries = "glue.max-retries"
RetryMode = "glue.retry-mode"
AccessKeyID = "glue.access-key-id"
SecretAccessKey = "glue.secret-access-key"
SessionToken = "glue.session-token"
Region = "glue.region"
Endpoint = "glue.endpoint"
MaxRetries = "glue.max-retries"
RetryMode = "glue.retry-mode"
SkipArchive = "glue.skip-archive"
SkipArchiveDefault = true

ExternalTable = "EXTERNAL_TABLE"
)

var _ catalog.Catalog = (*Catalog)(nil)
Expand All @@ -75,7 +81,7 @@ func init() {
return nil, err
}

return NewCatalog(WithAwsConfig(awsConfig), WithAwsProperties(AwsProperties(props))), nil
return NewCatalog(WithAwsConfig(awsConfig), WithProperties(props)), nil
}))
}

Expand Down Expand Up @@ -122,12 +128,14 @@ type glueAPI interface {
CreateDatabase(ctx context.Context, params *glue.CreateDatabaseInput, optFns ...func(*glue.Options)) (*glue.CreateDatabaseOutput, error)
DeleteDatabase(ctx context.Context, params *glue.DeleteDatabaseInput, optFns ...func(*glue.Options)) (*glue.DeleteDatabaseOutput, error)
UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput, optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error)
UpdateTable(ctx context.Context, params *glue.UpdateTableInput, optFns ...func(*glue.Options)) (*glue.UpdateTableOutput, error)
}

type Catalog struct {
glueSvc glueAPI
catalogId *string
awsCfg *aws.Config
props iceberg.Properties
}

// NewCatalog creates a new instance of glue.Catalog with the given options.
Expand All @@ -139,7 +147,7 @@ func NewCatalog(opts ...Option) *Catalog {
}

var catalogId *string
if val, ok := glueOps.awsProperties[CatalogIdKey]; ok {
if val, ok := glueOps.properties[CatalogIdKey]; ok {
catalogId = &val
} else {
catalogId = nil
Expand All @@ -149,6 +157,7 @@ func NewCatalog(opts ...Option) *Catalog {
glueSvc: glue.NewFromConfig(glueOps.awsConfig),
catalogId: catalogId,
awsCfg: &glueOps.awsConfig,
props: glueOps.properties,
}
}

Expand Down Expand Up @@ -288,6 +297,145 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
return createdTable, nil
}

func (c *Catalog) CommitTable(ctx context.Context, table *table.Table, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) {
tableIdentifier := table.Identifier()
database, tableName, err := identifierToGlueTable(tableIdentifier)
if err != nil {
return nil, "", err
}

currentGlueTable, err := c.getTable(ctx, database, tableName)
if err != nil {
return nil, "", err
}

glueTableVersionID := currentGlueTable.VersionId
currentTable, err := c.glueToIcebergTable(ctx, currentGlueTable)
if err != nil {
return nil, "", err
}

updatedStageTable, err := c.updateAndStageTable(ctx, table, tableIdentifier, requirements, updates)
if err != nil {
return nil, "", err
}

// no change; do nothing
if currentTable != nil && updatedStageTable.Metadata() == currentTable.Metadata() {
return currentTable.Metadata(), currentTable.MetadataLocation(), nil
}

err = internal.WriteMetadata(ctx, updatedStageTable.Metadata(),
updatedStageTable.MetadataLocation(), updatedStageTable.Properties())
if err != nil {
return nil, "", err
}

if table != nil {
if glueTableVersionID == nil {
return nil, "", fmt.Errorf("cannot commit %s.%s because Glue Table version id is missing", database, tableName)
}

newParameters := currentTable.Properties()
newParameters[tableTypePropsKey] = glueTypeIceberg
newParameters[metadataLocationPropsKey] = updatedStageTable.Location()
if currentTable != nil {
newParameters[previousMetadataLocationPropsKey] = currentTable.MetadataLocation()
}

updateTableInput := &glue.UpdateTableInput{
DatabaseName: &database,
TableInput: &types.TableInput{
Name: &tableName,
TableType: aws.String(ExternalTable),
Parameters: newParameters,
StorageDescriptor: &types.StorageDescriptor{
Columns: toColumns(updatedStageTable.Metadata()),
Location: aws.String(updatedStageTable.MetadataLocation()),
},
Description: aws.String(updatedStageTable.Properties()["Description"]),
},
SkipArchive: aws.Bool(c.props.GetBool(SkipArchive, SkipArchiveDefault)),
VersionId: glueTableVersionID,
}

_, err := c.glueSvc.UpdateTable(ctx, updateTableInput)
if err != nil {
if errors.Is(err, &types.EntityNotFoundException{}) {
return nil, "", fmt.Errorf("table does not exist: %s.%s (Glue table version %s)",
database, tableName, *glueTableVersionID)
} else if errors.Is(err, &types.ConcurrentModificationException{}) {
return nil, "", fmt.Errorf("cannot commit %s.%s because Glue detected concurrent update to table version %s",
database, tableName, *glueTableVersionID)
}

return nil, "", err
}
} else {
// TODO finish
}

// TODO finish
return nil, "", nil
}

func (c *Catalog) updateAndStageTable(ctx context.Context, currentTable *table.Table, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (*table.Table, error) {
var metadata table.Metadata
newMetadataVersion := 0

if currentTable != nil {
metadata = currentTable.Metadata()
newMetadataVersion = metadata.Version() + 1
}

for _, req := range requirements {
err := req.Validate(metadata)
if err != nil {
return nil, err
}
}

updatedMetadata, err := table.UpdateTableMetadata(metadata, updates)
if err != nil {
return nil, err
}

provider, err := table.LoadLocationProvider(updatedMetadata.Location(), updatedMetadata.Properties())
if err != nil {
return nil, err
}

newMetadataLocation, err := provider.NewTableMetadataFileLocation(newMetadataVersion)
if err != nil {
return nil, err
}

iofs, err := io.LoadFS(ctx, updatedMetadata.Properties(), newMetadataLocation)
if err != nil {
return nil, fmt.Errorf("failed to load table %s: %w", strings.Join(currentTable.Identifier(), "."), err)
}

return table.New(identifier, updatedMetadata, newMetadataLocation, iofs, c), nil
}

func (c *Catalog) glueToIcebergTable(ctx context.Context, gTable *types.Table) (*table.Table, error) {
location, ok := gTable.Parameters[metadataLocationPropsKey]
if !ok {
return nil, fmt.Errorf("missing metadata location for table %s.%s", *gTable.DatabaseName, *gTable.Name)
}
props := gTable.Parameters
ctx = utils.WithAwsConfig(ctx, c.awsCfg)
// TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog.
iofs, err := io.LoadFS(ctx, props, location)
if err != nil {
return nil, fmt.Errorf("failed to load table %s.%s: %w", *gTable.DatabaseName, *gTable.Name, err)
}

return table.NewFromLocation(
TableIdentifier(*gTable.DatabaseName, *gTable.Name),
location, iofs, c)
}

// RegisterTable registers a new table using existing metadata.
func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLocation string) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
Expand Down Expand Up @@ -331,10 +479,6 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
return c.LoadTable(ctx, identifier, nil)
}

func (c *Catalog) CommitTable(context.Context, *table.Table, []table.Requirement, []table.Update) (table.Metadata, string, error) {
panic("commit table not implemented for Glue Catalog")
}

// DropTable deletes an Iceberg table from the Glue catalog.
func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error {
database, tableName, err := identifierToGlueTable(identifier)
Expand Down
11 changes: 5 additions & 6 deletions catalog/glue/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package glue

import (
"github.com/apache/iceberg-go"
"github.com/aws/aws-sdk-go-v2/aws"
)

type AwsProperties map[string]string

type Option func(*options)

// WithAwsConfig sets the AWS configuration for the catalog.
Expand All @@ -32,13 +31,13 @@ func WithAwsConfig(cfg aws.Config) Option {
}
}

func WithAwsProperties(props AwsProperties) Option {
func WithProperties(props iceberg.Properties) Option {
return func(o *options) {
o.awsProperties = props
o.properties = props
}
}

type options struct {
awsConfig aws.Config
awsProperties AwsProperties
awsConfig aws.Config
properties iceberg.Properties
}
41 changes: 41 additions & 0 deletions catalog/glue/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ import (
"strings"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/glue/types"
"golang.org/x/exp/maps"
)

const (
icebergFieldID = "iceberg.field.id"
icebergFieldOptional = "iceberg.field.optional"
icebergFieldCurrent = "iceberg.field.current"
)

// schemaToGlueColumns converts an Iceberg schema to a list of Glue columns.
Expand Down Expand Up @@ -107,3 +115,36 @@ func icebergTypeToGlueType(typ iceberg.Type) string {
return "string"
}
}

func toColumns(metadata table.Metadata) []types.Column {
result := make(map[string]types.Column)

appendToResult := func(field iceberg.NestedField, isCurrent bool) {
if _, found := result[field.Name]; found {
return
}

glueColumn := fieldToGlueColumn(field)
glueColumn.Parameters[icebergFieldID] = fmt.Sprintf("%d", field.ID)
glueColumn.Parameters[icebergFieldOptional] = fmt.Sprintf("%t", field.Required)
glueColumn.Parameters[icebergFieldCurrent] = fmt.Sprintf("%t", isCurrent)
result[field.Name] = glueColumn
}

currentSchema := metadata.CurrentSchema()
for _, field := range currentSchema.Fields() {
appendToResult(field, true)
}

for _, schema := range metadata.Schemas() {
if schema.ID == metadata.CurrentSchema().ID {
continue
}

for _, field := range schema.Fields() {
appendToResult(field, false)
}
}

return maps.Values(result)
}
69 changes: 69 additions & 0 deletions table/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,72 @@ func NewRemoveSnapshotRefUpdate(ref string) *removeSnapshotRefUpdate {
func (u *removeSnapshotRefUpdate) Apply(builder *MetadataBuilder) error {
return fmt.Errorf("%w: remove-snapshot-ref", iceberg.ErrNotImplemented)
}

// type MetadataUpdateCtx struct {
// updates []Update
// }

// func (m *MetadataUpdateCtx) AddUpdate(update Update) {
// m.updates = append(m.updates, update)
// }

// func (m *MetadataUpdateCtx) IsAddedSnapshot(snapshotID int64) bool {
// for _, update := range m.updates {
// addSnapshotUpdate, yes := update.(*addSnapshotUpdate)
// if yes && addSnapshotUpdate.Snapshot.SnapshotID == snapshotID {
// return true
// }
// }

// return false
// }

// func (m *MetadataUpdateCtx) IsAddedSchema(schemaID int) bool {
// for _, update := range m.updates {
// addSchemaUpdate, yes := update.(*addSchemaUpdate)
// if yes && addSchemaUpdate.Schema.ID == schemaID {
// return true
// }
// }

// return false
// }

// func (m *MetadataUpdateCtx) IsAddedPartitionSpec(specID int) bool {
// for _, update := range m.updates {
// addPartitionSpecUpdate, yes := update.(*addPartitionSpecUpdate)
// if yes && addPartitionSpecUpdate.Spec.ID() == specID {
// return true
// }
// }

// return false
// }

// func (m *MetadataUpdateCtx) IsAddedSortOrder(sortOrderID int) bool {
// for _, update := range m.updates {
// addSortOrderUpdate, yes := update.(*addSortOrderUpdate)
// if yes && addSortOrderUpdate.SortOrder.OrderID == sortOrderID {
// return true
// }
// }

// return false
// }

// func (m *MetadataUpdateCtx) HasChanges() bool {
// return len(m.updates) > 0
// }

// Update the table metada with the given updaes in one transactions
//
// Returns metadata with updates applied
func UpdateTableMetadata(baseMetadata Metadata, updates []Update) (Metadata, error) {
newMetadataBuilder, err := MetadataBuilderFromBase(baseMetadata)
if err != nil {
return nil, err
}
newMetadataBuilder.updates = updates

return newMetadataBuilder.Build()
}
Loading