diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go index 8e2bc1394..f39ff13c8 100644 --- a/catalog/glue/glue.go +++ b/catalog/glue/glue.go @@ -23,6 +23,7 @@ import ( "fmt" "iter" "strconv" + "strings" _ "unsafe" "github.com/apache/iceberg-go" @@ -50,20 +51,25 @@ const ( locationPropsKey = "Location" // Table metadata location pointer. - metadataLocationPropsKey = "metadata_location" + metadataLocationPropsKey = "metadata_location" + previousMetadataLocationPropsKey = "previous_metadata_location" // The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue // automatically uses the caller's AWS account ID by default. // See: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html CatalogIdKey = "glue.id" - AccessKeyID = "glue.access-key-id" - SecretAccessKey = "glue.secret-access-key" - SessionToken = "glue.session-token" - Region = "glue.region" - Endpoint = "glue.endpoint" - MaxRetries = "glue.max-retries" - RetryMode = "glue.retry-mode" + AccessKeyID = "glue.access-key-id" + SecretAccessKey = "glue.secret-access-key" + SessionToken = "glue.session-token" + Region = "glue.region" + Endpoint = "glue.endpoint" + MaxRetries = "glue.max-retries" + RetryMode = "glue.retry-mode" + SkipArchive = "glue.skip-archive" + SkipArchiveDefault = true + + ExternalTable = "EXTERNAL_TABLE" ) var _ catalog.Catalog = (*Catalog)(nil) @@ -75,7 +81,7 @@ func init() { return nil, err } - return NewCatalog(WithAwsConfig(awsConfig), WithAwsProperties(AwsProperties(props))), nil + return NewCatalog(WithAwsConfig(awsConfig), WithProperties(props)), nil })) } @@ -122,12 +128,14 @@ type glueAPI interface { CreateDatabase(ctx context.Context, params *glue.CreateDatabaseInput, optFns ...func(*glue.Options)) (*glue.CreateDatabaseOutput, error) DeleteDatabase(ctx context.Context, params *glue.DeleteDatabaseInput, optFns ...func(*glue.Options)) (*glue.DeleteDatabaseOutput, error) UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput, optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error) + UpdateTable(ctx context.Context, params *glue.UpdateTableInput, optFns ...func(*glue.Options)) (*glue.UpdateTableOutput, error) } type Catalog struct { glueSvc glueAPI catalogId *string awsCfg *aws.Config + props iceberg.Properties } // NewCatalog creates a new instance of glue.Catalog with the given options. @@ -139,7 +147,7 @@ func NewCatalog(opts ...Option) *Catalog { } var catalogId *string - if val, ok := glueOps.awsProperties[CatalogIdKey]; ok { + if val, ok := glueOps.properties[CatalogIdKey]; ok { catalogId = &val } else { catalogId = nil @@ -149,6 +157,7 @@ func NewCatalog(opts ...Option) *Catalog { glueSvc: glue.NewFromConfig(glueOps.awsConfig), catalogId: catalogId, awsCfg: &glueOps.awsConfig, + props: glueOps.properties, } } @@ -288,6 +297,145 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, return createdTable, nil } +func (c *Catalog) CommitTable(ctx context.Context, table *table.Table, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) { + tableIdentifier := table.Identifier() + database, tableName, err := identifierToGlueTable(tableIdentifier) + if err != nil { + return nil, "", err + } + + currentGlueTable, err := c.getTable(ctx, database, tableName) + if err != nil { + return nil, "", err + } + + glueTableVersionID := currentGlueTable.VersionId + currentTable, err := c.glueToIcebergTable(ctx, currentGlueTable) + if err != nil { + return nil, "", err + } + + updatedStageTable, err := c.updateAndStageTable(ctx, table, tableIdentifier, requirements, updates) + if err != nil { + return nil, "", err + } + + // no change; do nothing + if currentTable != nil && updatedStageTable.Metadata() == currentTable.Metadata() { + return currentTable.Metadata(), currentTable.MetadataLocation(), nil + } + + err = internal.WriteMetadata(ctx, updatedStageTable.Metadata(), + updatedStageTable.MetadataLocation(), updatedStageTable.Properties()) + if err != nil { + return nil, "", err + } + + if table != nil { + if glueTableVersionID == nil { + return nil, "", fmt.Errorf("cannot commit %s.%s because Glue Table version id is missing", database, tableName) + } + + newParameters := currentTable.Properties() + newParameters[tableTypePropsKey] = glueTypeIceberg + newParameters[metadataLocationPropsKey] = updatedStageTable.Location() + if currentTable != nil { + newParameters[previousMetadataLocationPropsKey] = currentTable.MetadataLocation() + } + + updateTableInput := &glue.UpdateTableInput{ + DatabaseName: &database, + TableInput: &types.TableInput{ + Name: &tableName, + TableType: aws.String(ExternalTable), + Parameters: newParameters, + StorageDescriptor: &types.StorageDescriptor{ + Columns: toColumns(updatedStageTable.Metadata()), + Location: aws.String(updatedStageTable.MetadataLocation()), + }, + Description: aws.String(updatedStageTable.Properties()["Description"]), + }, + SkipArchive: aws.Bool(c.props.GetBool(SkipArchive, SkipArchiveDefault)), + VersionId: glueTableVersionID, + } + + _, err := c.glueSvc.UpdateTable(ctx, updateTableInput) + if err != nil { + if errors.Is(err, &types.EntityNotFoundException{}) { + return nil, "", fmt.Errorf("table does not exist: %s.%s (Glue table version %s)", + database, tableName, *glueTableVersionID) + } else if errors.Is(err, &types.ConcurrentModificationException{}) { + return nil, "", fmt.Errorf("cannot commit %s.%s because Glue detected concurrent update to table version %s", + database, tableName, *glueTableVersionID) + } + + return nil, "", err + } + } else { + // TODO finish + } + + // TODO finish + return nil, "", nil +} + +func (c *Catalog) updateAndStageTable(ctx context.Context, currentTable *table.Table, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (*table.Table, error) { + var metadata table.Metadata + newMetadataVersion := 0 + + if currentTable != nil { + metadata = currentTable.Metadata() + newMetadataVersion = metadata.Version() + 1 + } + + for _, req := range requirements { + err := req.Validate(metadata) + if err != nil { + return nil, err + } + } + + updatedMetadata, err := table.UpdateTableMetadata(metadata, updates) + if err != nil { + return nil, err + } + + provider, err := table.LoadLocationProvider(updatedMetadata.Location(), updatedMetadata.Properties()) + if err != nil { + return nil, err + } + + newMetadataLocation, err := provider.NewTableMetadataFileLocation(newMetadataVersion) + if err != nil { + return nil, err + } + + iofs, err := io.LoadFS(ctx, updatedMetadata.Properties(), newMetadataLocation) + if err != nil { + return nil, fmt.Errorf("failed to load table %s: %w", strings.Join(currentTable.Identifier(), "."), err) + } + + return table.New(identifier, updatedMetadata, newMetadataLocation, iofs, c), nil +} + +func (c *Catalog) glueToIcebergTable(ctx context.Context, gTable *types.Table) (*table.Table, error) { + location, ok := gTable.Parameters[metadataLocationPropsKey] + if !ok { + return nil, fmt.Errorf("missing metadata location for table %s.%s", *gTable.DatabaseName, *gTable.Name) + } + props := gTable.Parameters + ctx = utils.WithAwsConfig(ctx, c.awsCfg) + // TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog. + iofs, err := io.LoadFS(ctx, props, location) + if err != nil { + return nil, fmt.Errorf("failed to load table %s.%s: %w", *gTable.DatabaseName, *gTable.Name, err) + } + + return table.NewFromLocation( + TableIdentifier(*gTable.DatabaseName, *gTable.Name), + location, iofs, c) +} + // RegisterTable registers a new table using existing metadata. func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLocation string) (*table.Table, error) { database, tableName, err := identifierToGlueTable(identifier) @@ -331,10 +479,6 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier return c.LoadTable(ctx, identifier, nil) } -func (c *Catalog) CommitTable(context.Context, *table.Table, []table.Requirement, []table.Update) (table.Metadata, string, error) { - panic("commit table not implemented for Glue Catalog") -} - // DropTable deletes an Iceberg table from the Glue catalog. func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error { database, tableName, err := identifierToGlueTable(identifier) diff --git a/catalog/glue/options.go b/catalog/glue/options.go index 5d4d6d00b..4c55ca4de 100644 --- a/catalog/glue/options.go +++ b/catalog/glue/options.go @@ -18,11 +18,10 @@ package glue import ( + "github.com/apache/iceberg-go" "github.com/aws/aws-sdk-go-v2/aws" ) -type AwsProperties map[string]string - type Option func(*options) // WithAwsConfig sets the AWS configuration for the catalog. @@ -32,13 +31,13 @@ func WithAwsConfig(cfg aws.Config) Option { } } -func WithAwsProperties(props AwsProperties) Option { +func WithProperties(props iceberg.Properties) Option { return func(o *options) { - o.awsProperties = props + o.properties = props } } type options struct { - awsConfig aws.Config - awsProperties AwsProperties + awsConfig aws.Config + properties iceberg.Properties } diff --git a/catalog/glue/schema.go b/catalog/glue/schema.go index 27b2f6d64..23af1f464 100644 --- a/catalog/glue/schema.go +++ b/catalog/glue/schema.go @@ -22,8 +22,16 @@ import ( "strings" "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/glue/types" + "golang.org/x/exp/maps" +) + +const ( + icebergFieldID = "iceberg.field.id" + icebergFieldOptional = "iceberg.field.optional" + icebergFieldCurrent = "iceberg.field.current" ) // schemaToGlueColumns converts an Iceberg schema to a list of Glue columns. @@ -107,3 +115,36 @@ func icebergTypeToGlueType(typ iceberg.Type) string { return "string" } } + +func toColumns(metadata table.Metadata) []types.Column { + result := make(map[string]types.Column) + + appendToResult := func(field iceberg.NestedField, isCurrent bool) { + if _, found := result[field.Name]; found { + return + } + + glueColumn := fieldToGlueColumn(field) + glueColumn.Parameters[icebergFieldID] = fmt.Sprintf("%d", field.ID) + glueColumn.Parameters[icebergFieldOptional] = fmt.Sprintf("%t", field.Required) + glueColumn.Parameters[icebergFieldCurrent] = fmt.Sprintf("%t", isCurrent) + result[field.Name] = glueColumn + } + + currentSchema := metadata.CurrentSchema() + for _, field := range currentSchema.Fields() { + appendToResult(field, true) + } + + for _, schema := range metadata.Schemas() { + if schema.ID == metadata.CurrentSchema().ID { + continue + } + + for _, field := range schema.Fields() { + appendToResult(field, false) + } + } + + return maps.Values(result) +} diff --git a/table/updates.go b/table/updates.go index 4225a4009..d91286ba8 100644 --- a/table/updates.go +++ b/table/updates.go @@ -390,3 +390,72 @@ func NewRemoveSnapshotRefUpdate(ref string) *removeSnapshotRefUpdate { func (u *removeSnapshotRefUpdate) Apply(builder *MetadataBuilder) error { return fmt.Errorf("%w: remove-snapshot-ref", iceberg.ErrNotImplemented) } + +// type MetadataUpdateCtx struct { +// updates []Update +// } + +// func (m *MetadataUpdateCtx) AddUpdate(update Update) { +// m.updates = append(m.updates, update) +// } + +// func (m *MetadataUpdateCtx) IsAddedSnapshot(snapshotID int64) bool { +// for _, update := range m.updates { +// addSnapshotUpdate, yes := update.(*addSnapshotUpdate) +// if yes && addSnapshotUpdate.Snapshot.SnapshotID == snapshotID { +// return true +// } +// } + +// return false +// } + +// func (m *MetadataUpdateCtx) IsAddedSchema(schemaID int) bool { +// for _, update := range m.updates { +// addSchemaUpdate, yes := update.(*addSchemaUpdate) +// if yes && addSchemaUpdate.Schema.ID == schemaID { +// return true +// } +// } + +// return false +// } + +// func (m *MetadataUpdateCtx) IsAddedPartitionSpec(specID int) bool { +// for _, update := range m.updates { +// addPartitionSpecUpdate, yes := update.(*addPartitionSpecUpdate) +// if yes && addPartitionSpecUpdate.Spec.ID() == specID { +// return true +// } +// } + +// return false +// } + +// func (m *MetadataUpdateCtx) IsAddedSortOrder(sortOrderID int) bool { +// for _, update := range m.updates { +// addSortOrderUpdate, yes := update.(*addSortOrderUpdate) +// if yes && addSortOrderUpdate.SortOrder.OrderID == sortOrderID { +// return true +// } +// } + +// return false +// } + +// func (m *MetadataUpdateCtx) HasChanges() bool { +// return len(m.updates) > 0 +// } + +// Update the table metada with the given updaes in one transactions +// +// Returns metadata with updates applied +func UpdateTableMetadata(baseMetadata Metadata, updates []Update) (Metadata, error) { + newMetadataBuilder, err := MetadataBuilderFromBase(baseMetadata) + if err != nil { + return nil, err + } + newMetadataBuilder.updates = updates + + return newMetadataBuilder.Build() +}