From 87a56ba29fa0014a9632568f131eb143090361ce Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Wed, 14 Jan 2026 19:45:57 -0600 Subject: [PATCH 1/7] Initial support for Hive Catalog. --- catalog/hive/README.md | 191 +++++++ catalog/hive/client.go | 131 +++++ catalog/hive/hive.go | 585 +++++++++++++++++++++ catalog/hive/hive_integration_test.go | 319 ++++++++++++ catalog/hive/hive_test.go | 720 ++++++++++++++++++++++++++ catalog/hive/options.go | 102 ++++ catalog/hive/schema.go | 182 +++++++ cmd/iceberg/main.go | 11 + go.mod | 4 + go.sum | 8 + internal/recipe/docker-compose.yml | 37 ++ 11 files changed, 2290 insertions(+) create mode 100644 catalog/hive/README.md create mode 100644 catalog/hive/client.go create mode 100644 catalog/hive/hive.go create mode 100644 catalog/hive/hive_integration_test.go create mode 100644 catalog/hive/hive_test.go create mode 100644 catalog/hive/options.go create mode 100644 catalog/hive/schema.go diff --git a/catalog/hive/README.md b/catalog/hive/README.md new file mode 100644 index 000000000..d390972b7 --- /dev/null +++ b/catalog/hive/README.md @@ -0,0 +1,191 @@ +# Hive Metastore Catalog + +This package provides a Hive Metastore catalog implementation for Apache Iceberg in Go. + +## Features + +- Full `catalog.Catalog` interface implementation +- Namespace (database) management: create, drop, list, describe +- Table management: create, drop, rename, list, load +- Table metadata commits with optimistic locking +- Support for NOSASL and Kerberos authentication +- Uses the [gohive](https://github.com/beltran/gohive) library for Thrift communication + +## Prerequisites + +### Running Hive Metastore Locally + +Start the Hive Metastore using Docker Compose: + +```bash +cd internal/recipe +docker-compose up -d hive-metastore hiveserver2 +``` + +This starts: +- **hive-metastore** on port `9083` (Thrift RPC) +- **hiveserver2** on port `10002` (Web UI) and `10003` (JDBC) + +Verify the services are running: + +```bash +docker ps | grep hive +``` + +Access the HiveServer2 Web UI at: http://localhost:10002 + +## CLI Usage + +The `iceberg` CLI supports the Hive catalog. Use `--catalog hive` and `--uri thrift://localhost:9083`. + +### List Namespaces + +```bash +go run ./cmd/iceberg list --catalog hive --uri thrift://localhost:9083 +``` + +### Create a Namespace + +```bash +# Use /tmp/iceberg-warehouse which is mounted in Docker containers +go run ./cmd/iceberg create namespace --catalog hive --uri thrift://localhost:9083 \ + --description "Test namespace for Iceberg tables" \ + --location-uri /tmp/iceberg-warehouse/test_ns \ + test_ns +``` + +### Describe a Namespace + +```bash +go run ./cmd/iceberg describe namespace --catalog hive --uri thrift://localhost:9083 test_ns +``` + +### Create a Table + +```bash +go run ./cmd/iceberg create table --catalog hive --uri thrift://localhost:9083 \ + --schema '[{"name":"id","type":"long","required":true},{"name":"name","type":"string"},{"name":"created_at","type":"timestamp"}]' \ + --location-uri /tmp/iceberg-warehouse/test_ns/users \ + test_ns.users +``` + +### List Tables in a Namespace + +```bash +go run ./cmd/iceberg list --catalog hive --uri thrift://localhost:9083 test_ns +``` + +### Describe a Table + +```bash +go run ./cmd/iceberg describe table --catalog hive --uri thrift://localhost:9083 test_ns.users +``` + +### Get Table Schema + +```bash +go run ./cmd/iceberg schema --catalog hive --uri thrift://localhost:9083 test_ns.users +``` + +### Get Table Location + +```bash +go run ./cmd/iceberg location --catalog hive --uri thrift://localhost:9083 test_ns.users +``` + +### Set Table Properties + +```bash +go run ./cmd/iceberg properties set table --catalog hive --uri thrift://localhost:9083 \ + test_ns.users write.format.default parquet +``` + +### Get Table Properties + +```bash +go run ./cmd/iceberg properties get table --catalog hive --uri thrift://localhost:9083 test_ns.users +``` + +### Drop a Table + +```bash +go run ./cmd/iceberg drop table --catalog hive --uri thrift://localhost:9083 test_ns.users +``` + +### Drop a Namespace + +```bash +go run ./cmd/iceberg drop namespace --catalog hive --uri thrift://localhost:9083 test_ns +``` + +## Programmatic Usage + +```go +package main + +import ( + "context" + "fmt" + "log" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/catalog/hive" +) + +func main() { + ctx := context.Background() + + // Create catalog + props := iceberg.Properties{ + hive.URI: "thrift://localhost:9083", + hive.Warehouse: "/tmp/iceberg-warehouse", + } + + cat, err := hive.NewCatalog(props) + if err != nil { + log.Fatal(err) + } + defer cat.Close() + + // List namespaces + namespaces, err := cat.ListNamespaces(ctx, nil) + if err != nil { + log.Fatal(err) + } + + fmt.Println("Namespaces:") + for _, ns := range namespaces { + fmt.Printf(" - %v\n", ns) + } + + // Create a namespace + err = cat.CreateNamespace(ctx, hive.DatabaseIdentifier("my_db"), iceberg.Properties{ + "location": "/tmp/iceberg-warehouse/my_db", + "comment": "My test database", + }) + if err != nil { + log.Fatal(err) + } + + // Create a table + schema := iceberg.NewSchemaWithIdentifiers(0, []int{1}, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, + ) + + tbl, err := cat.CreateTable(ctx, hive.TableIdentifier("my_db", "my_table"), schema) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("Created table: %s\n", tbl.Identifier()) +} +``` + +## Configuration Properties + +| Property | Description | Default | +|----------|-------------|---------| +| `uri` | Thrift URI for Hive Metastore (e.g., `thrift://localhost:9083`) | Required | +| `warehouse` | Default warehouse location for tables | - | +| `hive.kerberos-authentication` | Enable Kerberos authentication | `false` | diff --git a/catalog/hive/client.go b/catalog/hive/client.go new file mode 100644 index 000000000..10e4d0a10 --- /dev/null +++ b/catalog/hive/client.go @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hive + +import ( + "context" + "fmt" + "net/url" + "strconv" + + "github.com/beltran/gohive" + "github.com/beltran/gohive/hive_metastore" +) + +// HiveClient interface for Hive Metastore operations. +// This allows for mocking in tests. +type HiveClient interface { + Close() + + GetDatabase(ctx context.Context, name string) (*hive_metastore.Database, error) + CreateDatabase(ctx context.Context, database *hive_metastore.Database) error + AlterDatabase(ctx context.Context, dbname string, db *hive_metastore.Database) error + DropDatabase(ctx context.Context, name string, deleteData, cascade bool) error + GetAllDatabases(ctx context.Context) ([]string, error) + + GetTable(ctx context.Context, dbName, tableName string) (*hive_metastore.Table, error) + CreateTable(ctx context.Context, tbl *hive_metastore.Table) error + AlterTable(ctx context.Context, dbName, tableName string, newTbl *hive_metastore.Table) error + DropTable(ctx context.Context, dbName, tableName string, deleteData bool) error + GetTables(ctx context.Context, dbName, pattern string) ([]string, error) +} + +// thriftClient wraps the gohive HiveMetastoreClient. +type thriftClient struct { + client *gohive.HiveMetastoreClient +} + +// NewHiveClient creates a new Hive Metastore client using gohive. +func NewHiveClient(uri string, opts *HiveOptions) (HiveClient, error) { + parsed, err := url.Parse(uri) + if err != nil { + return nil, fmt.Errorf("invalid URI: %w", err) + } + + host := parsed.Hostname() + portStr := parsed.Port() + if portStr == "" { + portStr = "9083" + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, fmt.Errorf("invalid port: %w", err) + } + + // Determine authentication mode + auth := "NOSASL" + if opts != nil && opts.KerberosAuth { + auth = "KERBEROS" + } + + config := gohive.NewMetastoreConnectConfiguration() + config.TransportMode = "binary" + + client, err := gohive.ConnectToMetastore(host, port, auth, config) + if err != nil { + return nil, fmt.Errorf("failed to connect to metastore: %w", err) + } + + return &thriftClient{client: client}, nil +} + +func (c *thriftClient) Close() { + if c.client != nil { + c.client.Close() + } +} + +func (c *thriftClient) GetDatabase(ctx context.Context, name string) (*hive_metastore.Database, error) { + return c.client.Client.GetDatabase(ctx, name) +} + +func (c *thriftClient) CreateDatabase(ctx context.Context, database *hive_metastore.Database) error { + return c.client.Client.CreateDatabase(ctx, database) +} + +func (c *thriftClient) AlterDatabase(ctx context.Context, dbname string, db *hive_metastore.Database) error { + return c.client.Client.AlterDatabase(ctx, dbname, db) +} + +func (c *thriftClient) DropDatabase(ctx context.Context, name string, deleteData, cascade bool) error { + return c.client.Client.DropDatabase(ctx, name, deleteData, cascade) +} + +func (c *thriftClient) GetAllDatabases(ctx context.Context) ([]string, error) { + return c.client.Client.GetAllDatabases(ctx) +} + +func (c *thriftClient) GetTable(ctx context.Context, dbName, tableName string) (*hive_metastore.Table, error) { + return c.client.Client.GetTable(ctx, dbName, tableName) +} + +func (c *thriftClient) CreateTable(ctx context.Context, tbl *hive_metastore.Table) error { + return c.client.Client.CreateTable(ctx, tbl) +} + +func (c *thriftClient) AlterTable(ctx context.Context, dbName, tableName string, newTbl *hive_metastore.Table) error { + return c.client.Client.AlterTable(ctx, dbName, tableName, newTbl) +} + +func (c *thriftClient) DropTable(ctx context.Context, dbName, tableName string, deleteData bool) error { + return c.client.Client.DropTable(ctx, dbName, tableName, deleteData) +} + +func (c *thriftClient) GetTables(ctx context.Context, dbName, pattern string) ([]string, error) { + return c.client.Client.GetTables(ctx, dbName, pattern) +} diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go new file mode 100644 index 000000000..a34f955b8 --- /dev/null +++ b/catalog/hive/hive.go @@ -0,0 +1,585 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hive + +import ( + "context" + "errors" + "fmt" + "iter" + "maps" + "strings" + _ "unsafe" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/catalog" + "github.com/apache/iceberg-go/catalog/internal" + "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/table" + "github.com/beltran/gohive/hive_metastore" +) + +const ( + // Property keys for namespace properties + locationKey = "location" + commentKey = "comment" + descriptionKey = "description" +) + +var _ catalog.Catalog = (*Catalog)(nil) + +func init() { + catalog.Register("hive", catalog.RegistrarFunc(func(ctx context.Context, _ string, props iceberg.Properties) (catalog.Catalog, error) { + return NewCatalog(props) + })) +} + +// Catalog implements the catalog.Catalog interface for Hive Metastore. +type Catalog struct { + client HiveClient + opts *HiveOptions +} + +// NewCatalog creates a new Hive Metastore catalog. +func NewCatalog(props iceberg.Properties, opts ...Option) (*Catalog, error) { + o := NewHiveOptions() + o.ApplyProperties(props) + + for _, opt := range opts { + opt(o) + } + + if o.URI == "" { + return nil, errors.New("hive.uri is required") + } + + client, err := NewHiveClient(o.URI, o) + if err != nil { + return nil, fmt.Errorf("failed to create Hive client: %w", err) + } + + return &Catalog{ + client: client, + opts: o, + }, nil +} + +// NewCatalogWithClient creates a new Hive Metastore catalog with a custom client. +// This is useful for testing with mock clients. +func NewCatalogWithClient(client HiveClient, props iceberg.Properties) *Catalog { + o := NewHiveOptions() + o.ApplyProperties(props) + return &Catalog{ + client: client, + opts: o, + } +} + +// CatalogType returns the type of the catalog. +func (c *Catalog) CatalogType() catalog.Type { + return catalog.Hive +} + +// Close closes the connection to the Hive Metastore. +func (c *Catalog) Close() error { + c.client.Close() + return nil +} + +// ListTables returns a list of table identifiers in the given namespace. +func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) iter.Seq2[table.Identifier, error] { + return func(yield func(table.Identifier, error) bool) { + database, err := identifierToDatabase(namespace) + if err != nil { + yield(nil, err) + return + } + + // Get all table names in the database + tableNames, err := c.client.GetTables(ctx, database, "*") + if err != nil { + yield(nil, fmt.Errorf("failed to list tables in %s: %w", database, err)) + return + } + + if len(tableNames) == 0 { + return + } + + // Check each table to see if it's an Iceberg table + for _, tableName := range tableNames { + tbl, err := c.client.GetTable(ctx, database, tableName) + if err != nil { + // Skip tables we can't read + continue + } + if isIcebergTable(tbl) { + if !yield(TableIdentifier(database, tableName), nil) { + return + } + } + } + } +} + +// LoadTable loads a table from the catalog. +func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) { + database, tableName, err := identifierToTableName(identifier) + if err != nil { + return nil, err + } + + hiveTbl, err := c.getIcebergTable(ctx, database, tableName) + if err != nil { + return nil, err + } + + metadataLocation, err := getMetadataLocation(hiveTbl) + if err != nil { + return nil, fmt.Errorf("failed to get metadata location: %w", err) + } + + return table.NewFromLocation( + ctx, + identifier, + metadataLocation, + io.LoadFSFunc(c.opts.props, metadataLocation), + c, + ) +} + +// CreateTable creates a new table in the catalog. +func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) { + staged, err := internal.CreateStagedTable(ctx, c.opts.props, c.LoadNamespaceProperties, identifier, schema, opts...) + if err != nil { + return nil, err + } + + database, tableName, err := identifierToTableName(identifier) + if err != nil { + return nil, err + } + + // Get the filesystem for writing metadata + afs, err := staged.FS(ctx) + if err != nil { + return nil, err + } + wfs, ok := afs.(io.WriteFileIO) + if !ok { + return nil, errors.New("loaded filesystem IO does not support writing") + } + + // Write the metadata file + compression := staged.Table.Properties().Get(table.MetadataCompressionKey, table.MetadataCompressionDefault) + if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation(), compression); err != nil { + return nil, err + } + + // Create the Hive table + hiveTbl := constructHiveTable(database, tableName, staged.Table.Location(), staged.MetadataLocation(), schema, staged.Table.Properties()) + + if err := c.client.CreateTable(ctx, hiveTbl); err != nil { + if isAlreadyExistsError(err) { + return nil, fmt.Errorf("%w: %s.%s", catalog.ErrTableAlreadyExists, database, tableName) + } + return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) + } + + return c.LoadTable(ctx, identifier) +} + +// DropTable drops a table from the catalog. +func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error { + database, tableName, err := identifierToTableName(identifier) + if err != nil { + return err + } + + // Verify it's an Iceberg table + if _, err := c.getIcebergTable(ctx, database, tableName); err != nil { + return err + } + + // Drop the table (deleteData=false for external tables) + if err := c.client.DropTable(ctx, database, tableName, false); err != nil { + return fmt.Errorf("failed to drop table %s.%s: %w", database, tableName, err) + } + + return nil +} + +// RenameTable renames a table in the catalog. +func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) { + fromDB, fromTable, err := identifierToTableName(from) + if err != nil { + return nil, err + } + + toDB, toTable, err := identifierToTableName(to) + if err != nil { + return nil, err + } + + // Check that target namespace exists + exists, err := c.CheckNamespaceExists(ctx, DatabaseIdentifier(toDB)) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, toDB) + } + + // Get the existing table + hiveTbl, err := c.getIcebergTable(ctx, fromDB, fromTable) + if err != nil { + return nil, err + } + + // Update table name and database + hiveTbl.TableName = toTable + hiveTbl.DbName = toDB + + // Alter the table to rename it + if err := c.client.AlterTable(ctx, fromDB, fromTable, hiveTbl); err != nil { + return nil, fmt.Errorf("failed to rename table %s.%s to %s.%s: %w", fromDB, fromTable, toDB, toTable, err) + } + + return c.LoadTable(ctx, to) +} + +// CommitTable commits updates to a table. +func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) { + database, tableName, err := identifierToTableName(identifier) + if err != nil { + return nil, "", err + } + + // Load current table state + currentHiveTbl, err := c.client.GetTable(ctx, database, tableName) + if err != nil && !isNoSuchObjectError(err) { + return nil, "", err + } + + var current *table.Table + if currentHiveTbl != nil && isIcebergTable(currentHiveTbl) { + metadataLoc, err := getMetadataLocation(currentHiveTbl) + if err != nil { + return nil, "", err + } + current, err = table.NewFromLocation(ctx, identifier, metadataLoc, io.LoadFSFunc(c.opts.props, metadataLoc), c) + if err != nil { + return nil, "", err + } + } + + // Create staged table with updates + staged, err := internal.UpdateAndStageTable(ctx, current, identifier, requirements, updates, c) + if err != nil { + return nil, "", err + } + + // Check if there are actual changes + if current != nil && staged.Metadata().Equals(current.Metadata()) { + return current.Metadata(), current.MetadataLocation(), nil + } + + // Write new metadata + if err := internal.WriteMetadata(ctx, staged.Metadata(), staged.MetadataLocation(), staged.Properties()); err != nil { + return nil, "", err + } + + // Update Hive table + if current != nil { + updatedHiveTbl := updateHiveTableForCommit(currentHiveTbl, staged.MetadataLocation()) + + if err := c.client.AlterTable(ctx, database, tableName, updatedHiveTbl); err != nil { + return nil, "", fmt.Errorf("failed to commit table %s.%s: %w", database, tableName, err) + } + } else { + // Create new table + hiveTbl := constructHiveTable(database, tableName, staged.Table.Location(), staged.MetadataLocation(), staged.Metadata().CurrentSchema(), staged.Properties()) + if err := c.client.CreateTable(ctx, hiveTbl); err != nil { + return nil, "", fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) + } + } + + return staged.Metadata(), staged.MetadataLocation(), nil +} + +// CheckTableExists checks if a table exists in the catalog. +func (c *Catalog) CheckTableExists(ctx context.Context, identifier table.Identifier) (bool, error) { + database, tableName, err := identifierToTableName(identifier) + if err != nil { + return false, err + } + + hiveTbl, err := c.client.GetTable(ctx, database, tableName) + if err != nil { + if isNoSuchObjectError(err) { + return false, nil + } + return false, err + } + + return isIcebergTable(hiveTbl), nil +} + +// Namespace operations + +// ListNamespaces returns a list of namespaces in the catalog. +func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) { + // Hive doesn't support hierarchical namespaces + if parent != nil && len(parent) > 0 { + return nil, errors.New("hierarchical namespace is not supported") + } + + databases, err := c.client.GetAllDatabases(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list namespaces: %w", err) + } + + namespaces := make([]table.Identifier, len(databases)) + for i, db := range databases { + namespaces[i] = DatabaseIdentifier(db) + } + + return namespaces, nil +} + +// CreateNamespace creates a new namespace in the catalog. +func (c *Catalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error { + database, err := identifierToDatabase(namespace) + if err != nil { + return err + } + + db := &hive_metastore.Database{ + Name: database, + Parameters: make(map[string]string), + } + + for k, v := range props { + switch k { + case locationKey, "Location": + db.LocationUri = v + case commentKey, descriptionKey, "Description": + db.Description = v + default: + db.Parameters[k] = v + } + } + + if err := c.client.CreateDatabase(ctx, db); err != nil { + if isAlreadyExistsError(err) { + return fmt.Errorf("%w: %s", catalog.ErrNamespaceAlreadyExists, database) + } + return fmt.Errorf("failed to create namespace %s: %w", database, err) + } + + return nil +} + +// DropNamespace drops a namespace from the catalog. +func (c *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) error { + database, err := identifierToDatabase(namespace) + if err != nil { + return err + } + + // Check if namespace exists + _, err = c.client.GetDatabase(ctx, database) + if err != nil { + if isNoSuchObjectError(err) { + return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, database) + } + return err + } + + // Drop database (cascade=false to fail if not empty) + if err := c.client.DropDatabase(ctx, database, false, false); err != nil { + if isInvalidOperationError(err) { + return fmt.Errorf("%w: %s", catalog.ErrNamespaceNotEmpty, database) + } + return fmt.Errorf("failed to drop namespace %s: %w", database, err) + } + + return nil +} + +// CheckNamespaceExists checks if a namespace exists in the catalog. +func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) { + database, err := identifierToDatabase(namespace) + if err != nil { + return false, err + } + + _, err = c.client.GetDatabase(ctx, database) + if err != nil { + if isNoSuchObjectError(err) { + return false, nil + } + return false, err + } + + return true, nil +} + +// LoadNamespaceProperties loads the properties for a namespace. +func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) { + database, err := identifierToDatabase(namespace) + if err != nil { + return nil, err + } + + db, err := c.client.GetDatabase(ctx, database) + if err != nil { + if isNoSuchObjectError(err) { + return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, database) + } + return nil, fmt.Errorf("failed to get namespace %s: %w", database, err) + } + + props := make(iceberg.Properties) + if db.Parameters != nil { + maps.Copy(props, db.Parameters) + } + if db.LocationUri != "" { + props[locationKey] = db.LocationUri + } + if db.Description != "" { + props[commentKey] = db.Description + } + + return props, nil +} + +// avoid circular dependency +// +//go:linkname getUpdatedPropsAndUpdateSummary github.com/apache/iceberg-go/catalog.getUpdatedPropsAndUpdateSummary +func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals []string, updates iceberg.Properties) (iceberg.Properties, catalog.PropertiesUpdateSummary, error) + +// UpdateNamespaceProperties updates the properties for a namespace. +func (c *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier, + removals []string, updates iceberg.Properties, +) (catalog.PropertiesUpdateSummary, error) { + currentProps, err := c.LoadNamespaceProperties(ctx, namespace) + if err != nil { + return catalog.PropertiesUpdateSummary{}, err + } + + updatedProperties, propertiesUpdateSummary, err := getUpdatedPropsAndUpdateSummary(currentProps, removals, updates) + if err != nil { + return catalog.PropertiesUpdateSummary{}, err + } + + database, _ := identifierToDatabase(namespace) + + db := &hive_metastore.Database{ + Name: database, + Parameters: make(map[string]string), + } + + for k, v := range updatedProperties { + switch k { + case locationKey, "Location": + db.LocationUri = v + case commentKey, descriptionKey, "Description": + db.Description = v + default: + db.Parameters[k] = v + } + } + + if err := c.client.AlterDatabase(ctx, database, db); err != nil { + return catalog.PropertiesUpdateSummary{}, fmt.Errorf("failed to update namespace properties %s: %w", database, err) + } + + return propertiesUpdateSummary, nil +} + +// getIcebergTable retrieves a table and validates it's an Iceberg table. +func (c *Catalog) getIcebergTable(ctx context.Context, database, tableName string) (*hive_metastore.Table, error) { + hiveTbl, err := c.client.GetTable(ctx, database, tableName) + if err != nil { + if isNoSuchObjectError(err) { + return nil, fmt.Errorf("%w: %s.%s", catalog.ErrNoSuchTable, database, tableName) + } + return nil, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) + } + + if !isIcebergTable(hiveTbl) { + return nil, fmt.Errorf("table %s.%s is not an Iceberg table", database, tableName) + } + + return hiveTbl, nil +} + +func identifierToTableName(identifier table.Identifier) (string, string, error) { + if len(identifier) != 2 { + return "", "", fmt.Errorf("invalid identifier, expected [database, table]: %v", identifier) + } + return identifier[0], identifier[1], nil +} + +func identifierToDatabase(identifier table.Identifier) (string, error) { + if len(identifier) != 1 { + return "", fmt.Errorf("invalid identifier, expected [database]: %v", identifier) + } + return identifier[0], nil +} + +// TableIdentifier returns a table identifier for a Hive table. +func TableIdentifier(database, tableName string) table.Identifier { + return []string{database, tableName} +} + +// DatabaseIdentifier returns a database identifier for a Hive database. +func DatabaseIdentifier(database string) table.Identifier { + return []string{database} +} + +// Error checking helpers for Hive Metastore exceptions + +func isNoSuchObjectError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "NoSuchObjectException") || + strings.Contains(errStr, "not found") || + strings.Contains(errStr, "does not exist") +} + +func isAlreadyExistsError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "AlreadyExistsException") || + strings.Contains(errStr, "already exists") +} + +func isInvalidOperationError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "InvalidOperationException") || + strings.Contains(errStr, "is not empty") +} diff --git a/catalog/hive/hive_integration_test.go b/catalog/hive/hive_integration_test.go new file mode 100644 index 000000000..046c46616 --- /dev/null +++ b/catalog/hive/hive_integration_test.go @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package hive + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/catalog" + "github.com/stretchr/testify/require" +) + +// Integration tests for the Hive Metastore catalog. +// These tests require a running Hive Metastore instance. +// +// To run these tests: +// 1. Start the Hive Metastore using Docker: +// cd internal/recipe && docker-compose up -d hive-metastore +// +// 2. Set the required environment variables: +// export TEST_HIVE_URI=thrift://localhost:9083 +// export TEST_HIVE_DATABASE=test_db +// export TEST_TABLE_LOCATION=/tmp/iceberg/warehouse +// +// 3. Run the tests: +// go test -tags=integration -v ./catalog/hive/... + +func getTestHiveURI() string { + uri := os.Getenv("TEST_HIVE_URI") + if uri == "" { + return "thrift://localhost:9083" + } + return uri +} + +func getTestDatabase() string { + db := os.Getenv("TEST_HIVE_DATABASE") + if db == "" { + return "test_iceberg_db" + } + return db +} + +func getTestTableLocation() string { + loc := os.Getenv("TEST_TABLE_LOCATION") + if loc == "" { + return "/tmp/iceberg/warehouse" + } + return loc +} + +func createTestCatalog(t *testing.T) *Catalog { + t.Helper() + + props := iceberg.Properties{ + URI: getTestHiveURI(), + Warehouse: getTestTableLocation(), + } + + cat, err := NewCatalog(props) + require.NoError(t, err) + + return cat +} + +func TestHiveIntegrationListNamespaces(t *testing.T) { + assert := require.New(t) + + cat := createTestCatalog(t) + defer cat.Close() + + namespaces, err := cat.ListNamespaces(context.TODO(), nil) + assert.NoError(err) + assert.NotNil(namespaces) + + t.Logf("Found %d namespaces", len(namespaces)) + for _, ns := range namespaces { + t.Logf(" - %v", ns) + } +} + +func TestHiveIntegrationCreateAndDropNamespace(t *testing.T) { + assert := require.New(t) + + cat := createTestCatalog(t) + defer cat.Close() + + dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + + // Create namespace + props := iceberg.Properties{ + "comment": "Test database for integration tests", + "location": getTestTableLocation() + "/" + dbName, + } + + err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), props) + assert.NoError(err) + + // Check it exists + exists, err := cat.CheckNamespaceExists(context.TODO(), DatabaseIdentifier(dbName)) + assert.NoError(err) + assert.True(exists) + + // Load properties + loadedProps, err := cat.LoadNamespaceProperties(context.TODO(), DatabaseIdentifier(dbName)) + assert.NoError(err) + assert.Equal("Test database for integration tests", loadedProps["comment"]) + + // Drop namespace + err = cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName)) + assert.NoError(err) + + // Verify it's gone + exists, err = cat.CheckNamespaceExists(context.TODO(), DatabaseIdentifier(dbName)) + assert.NoError(err) + assert.False(exists) +} + +func TestHiveIntegrationUpdateNamespaceProperties(t *testing.T) { + assert := require.New(t) + + cat := createTestCatalog(t) + defer cat.Close() + + dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + + // Create namespace with initial properties + initialProps := iceberg.Properties{ + "key1": "value1", + "key2": "value2", + } + + err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), initialProps) + assert.NoError(err) + defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName)) + + // Update properties + updates := iceberg.Properties{ + "key2": "updated_value2", + "key3": "value3", + } + removals := []string{"key1"} + + summary, err := cat.UpdateNamespaceProperties(context.TODO(), DatabaseIdentifier(dbName), removals, updates) + assert.NoError(err) + assert.Contains(summary.Removed, "key1") + assert.Contains(summary.Updated, "key2") + assert.Contains(summary.Updated, "key3") + + // Verify updates + props, err := cat.LoadNamespaceProperties(context.TODO(), DatabaseIdentifier(dbName)) + assert.NoError(err) + assert.Equal("updated_value2", props["key2"]) + assert.Equal("value3", props["key3"]) + _, exists := props["key1"] + assert.False(exists) +} + +func TestHiveIntegrationCreateAndListTables(t *testing.T) { + assert := require.New(t) + + cat := createTestCatalog(t) + defer cat.Close() + + dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + tableName := "test_table" + + // Create namespace + err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), iceberg.Properties{ + "location": getTestTableLocation() + "/" + dbName, + }) + assert.NoError(err) + defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName)) + + // Create table + schema := iceberg.NewSchemaWithIdentifiers(0, []int{1}, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, + ) + + tableLocation := getTestTableLocation() + "/" + dbName + "/" + tableName + tbl, err := cat.CreateTable(context.TODO(), TableIdentifier(dbName, tableName), schema, + catalog.WithLocation(tableLocation), + ) + assert.NoError(err) + assert.NotNil(tbl) + defer cat.DropTable(context.TODO(), TableIdentifier(dbName, tableName)) + + // Verify table exists + exists, err := cat.CheckTableExists(context.TODO(), TableIdentifier(dbName, tableName)) + assert.NoError(err) + assert.True(exists) + + // List tables + tables := make([][]string, 0) + for tblIdent, err := range cat.ListTables(context.TODO(), DatabaseIdentifier(dbName)) { + assert.NoError(err) + tables = append(tables, tblIdent) + } + assert.Len(tables, 1) + assert.Equal([]string{dbName, tableName}, tables[0]) + + // Load table + loadedTable, err := cat.LoadTable(context.TODO(), TableIdentifier(dbName, tableName)) + assert.NoError(err) + assert.NotNil(loadedTable) + assert.True(schema.Equals(loadedTable.Schema())) +} + +func TestHiveIntegrationRenameTable(t *testing.T) { + assert := require.New(t) + + cat := createTestCatalog(t) + defer cat.Close() + + dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + oldTableName := "old_table" + newTableName := "new_table" + + // Create namespace + err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), iceberg.Properties{ + "location": getTestTableLocation() + "/" + dbName, + }) + assert.NoError(err) + defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName)) + + // Create table + schema := iceberg.NewSchemaWithIdentifiers(0, []int{1}, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + ) + + tableLocation := getTestTableLocation() + "/" + dbName + "/" + oldTableName + _, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName, oldTableName), schema, + catalog.WithLocation(tableLocation), + ) + assert.NoError(err) + + // Rename table + renamedTable, err := cat.RenameTable(context.TODO(), + TableIdentifier(dbName, oldTableName), + TableIdentifier(dbName, newTableName), + ) + assert.NoError(err) + assert.NotNil(renamedTable) + defer cat.DropTable(context.TODO(), TableIdentifier(dbName, newTableName)) + + // Verify old table doesn't exist + exists, err := cat.CheckTableExists(context.TODO(), TableIdentifier(dbName, oldTableName)) + assert.NoError(err) + assert.False(exists) + + // Verify new table exists + exists, err = cat.CheckTableExists(context.TODO(), TableIdentifier(dbName, newTableName)) + assert.NoError(err) + assert.True(exists) +} + +func TestHiveIntegrationDropTable(t *testing.T) { + assert := require.New(t) + + cat := createTestCatalog(t) + defer cat.Close() + + dbName := fmt.Sprintf("test_db_%d", time.Now().UnixNano()) + tableName := "table_to_drop" + + // Create namespace + err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), iceberg.Properties{ + "location": getTestTableLocation() + "/" + dbName, + }) + assert.NoError(err) + defer cat.DropNamespace(context.TODO(), DatabaseIdentifier(dbName)) + + // Create table + schema := iceberg.NewSchemaWithIdentifiers(0, []int{1}, + iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + ) + + tableLocation := getTestTableLocation() + "/" + dbName + "/" + tableName + _, err = cat.CreateTable(context.TODO(), TableIdentifier(dbName, tableName), schema, + catalog.WithLocation(tableLocation), + ) + assert.NoError(err) + + // Verify table exists + exists, err := cat.CheckTableExists(context.TODO(), TableIdentifier(dbName, tableName)) + assert.NoError(err) + assert.True(exists) + + // Drop table + err = cat.DropTable(context.TODO(), TableIdentifier(dbName, tableName)) + assert.NoError(err) + + // Verify table is gone + exists, err = cat.CheckTableExists(context.TODO(), TableIdentifier(dbName, tableName)) + assert.NoError(err) + assert.False(exists) +} diff --git a/catalog/hive/hive_test.go b/catalog/hive/hive_test.go new file mode 100644 index 000000000..5712d48c6 --- /dev/null +++ b/catalog/hive/hive_test.go @@ -0,0 +1,720 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hive + +import ( + "context" + "errors" + "testing" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/catalog" + "github.com/apache/iceberg-go/table" + "github.com/beltran/gohive/hive_metastore" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// mockHiveClient is a mock implementation of HiveClient for testing. +type mockHiveClient struct { + mock.Mock +} + +func (m *mockHiveClient) GetDatabase(ctx context.Context, name string) (*hive_metastore.Database, error) { + args := m.Called(ctx, name) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*hive_metastore.Database), args.Error(1) +} + +func (m *mockHiveClient) GetAllDatabases(ctx context.Context) ([]string, error) { + args := m.Called(ctx) + return args.Get(0).([]string), args.Error(1) +} + +func (m *mockHiveClient) CreateDatabase(ctx context.Context, database *hive_metastore.Database) error { + args := m.Called(ctx, database) + return args.Error(0) +} + +func (m *mockHiveClient) DropDatabase(ctx context.Context, name string, deleteData, cascade bool) error { + args := m.Called(ctx, name, deleteData, cascade) + return args.Error(0) +} + +func (m *mockHiveClient) AlterDatabase(ctx context.Context, name string, database *hive_metastore.Database) error { + args := m.Called(ctx, name, database) + return args.Error(0) +} + +func (m *mockHiveClient) GetTable(ctx context.Context, dbName, tableName string) (*hive_metastore.Table, error) { + args := m.Called(ctx, dbName, tableName) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*hive_metastore.Table), args.Error(1) +} + +func (m *mockHiveClient) GetTables(ctx context.Context, dbName, pattern string) ([]string, error) { + args := m.Called(ctx, dbName, pattern) + return args.Get(0).([]string), args.Error(1) +} + +func (m *mockHiveClient) CreateTable(ctx context.Context, tbl *hive_metastore.Table) error { + args := m.Called(ctx, tbl) + return args.Error(0) +} + +func (m *mockHiveClient) DropTable(ctx context.Context, dbName, tableName string, deleteData bool) error { + args := m.Called(ctx, dbName, tableName, deleteData) + return args.Error(0) +} + +func (m *mockHiveClient) AlterTable(ctx context.Context, dbName, tableName string, newTable *hive_metastore.Table) error { + args := m.Called(ctx, dbName, tableName, newTable) + return args.Error(0) +} + +func (m *mockHiveClient) Close() { + m.Called() +} + +// Test data + +var testIcebergHiveTable1 = &hive_metastore.Table{ + TableName: "test_table", + DbName: "test_database", + TableType: TableTypeExternalTable, + Parameters: map[string]string{ + TableTypeKey: TableTypeIceberg, + MetadataLocationKey: "s3://test-bucket/test_table/metadata/abc123-123.metadata.json", + }, + Sd: &hive_metastore.StorageDescriptor{ + Location: "s3://test-bucket/test_table", + }, +} + +var testIcebergHiveTable2 = &hive_metastore.Table{ + TableName: "test_table2", + DbName: "test_database", + TableType: TableTypeExternalTable, + Parameters: map[string]string{ + TableTypeKey: TableTypeIceberg, + MetadataLocationKey: "s3://test-bucket/test_table2/metadata/abc456-456.metadata.json", + }, + Sd: &hive_metastore.StorageDescriptor{ + Location: "s3://test-bucket/test_table2", + }, +} + +var testNonIcebergHiveTable = &hive_metastore.Table{ + TableName: "other_table", + DbName: "test_database", + TableType: TableTypeExternalTable, + Parameters: map[string]string{ + "some_param": "some_value", + }, +} + +var testSchema = iceberg.NewSchemaWithIdentifiers(0, []int{}, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool}) + +// Error helpers for mocking + +var errNoSuchObject = errors.New("NoSuchObjectException: object not found") +var errAlreadyExists = errors.New("AlreadyExistsException: object already exists") +var errInvalidOperation = errors.New("InvalidOperationException: Database is not empty") + +// Tests + +func TestHiveListTables(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetTables", mock.Anything, "test_database", "*"). + Return([]string{"test_table", "test_table2", "other_table"}, nil).Once() + + // Mock individual GetTable calls for each table + mockClient.On("GetTable", mock.Anything, "test_database", "test_table"). + Return(testIcebergHiveTable1, nil).Once() + mockClient.On("GetTable", mock.Anything, "test_database", "test_table2"). + Return(testIcebergHiveTable2, nil).Once() + mockClient.On("GetTable", mock.Anything, "test_database", "other_table"). + Return(testNonIcebergHiveTable, nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + var lastErr error + tbls := make([]table.Identifier, 0) + iter := hiveCatalog.ListTables(context.TODO(), DatabaseIdentifier("test_database")) + + for tbl, err := range iter { + if err != nil { + lastErr = err + break + } + tbls = append(tbls, tbl) + } + + assert.NoError(lastErr) + assert.Len(tbls, 2) // Only Iceberg tables + assert.Contains(tbls, table.Identifier{"test_database", "test_table"}) + assert.Contains(tbls, table.Identifier{"test_database", "test_table2"}) + + mockClient.AssertExpectations(t) +} + +func TestHiveListTablesEmpty(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetTables", mock.Anything, "empty_database", "*"). + Return([]string{}, nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + var lastErr error + tbls := make([]table.Identifier, 0) + iter := hiveCatalog.ListTables(context.TODO(), DatabaseIdentifier("empty_database")) + + for tbl, err := range iter { + if err != nil { + lastErr = err + break + } + tbls = append(tbls, tbl) + } + + assert.NoError(lastErr) + assert.Len(tbls, 0) + + mockClient.AssertExpectations(t) +} + +func TestHiveListNamespaces(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetAllDatabases", mock.Anything). + Return([]string{"database1", "database2", "database3"}, nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + namespaces, err := hiveCatalog.ListNamespaces(context.TODO(), nil) + assert.NoError(err) + assert.Len(namespaces, 3) + assert.Equal([]string{"database1"}, namespaces[0]) + assert.Equal([]string{"database2"}, namespaces[1]) + assert.Equal([]string{"database3"}, namespaces[2]) + + mockClient.AssertExpectations(t) +} + +func TestHiveListNamespacesHierarchicalError(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + _, err := hiveCatalog.ListNamespaces(context.TODO(), []string{"parent"}) + assert.Error(err) + assert.Contains(err.Error(), "hierarchical namespace is not supported") +} + +func TestHiveCreateNamespace(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("CreateDatabase", mock.Anything, mock.MatchedBy(func(db *hive_metastore.Database) bool { + return db.Name == "new_database" && + db.Description == "Test Description" && + db.LocationUri == "s3://test-location" + })).Return(nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + props := map[string]string{ + "comment": "Test Description", + "location": "s3://test-location", + } + + err := hiveCatalog.CreateNamespace(context.TODO(), DatabaseIdentifier("new_database"), props) + assert.NoError(err) + + mockClient.AssertExpectations(t) +} + +func TestHiveCreateNamespaceAlreadyExists(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("CreateDatabase", mock.Anything, mock.Anything). + Return(errAlreadyExists).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + err := hiveCatalog.CreateNamespace(context.TODO(), DatabaseIdentifier("existing_database"), nil) + assert.Error(err) + assert.True(errors.Is(err, catalog.ErrNamespaceAlreadyExists)) + + mockClient.AssertExpectations(t) +} + +func TestHiveDropNamespace(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetDatabase", mock.Anything, "test_namespace"). + Return(&hive_metastore.Database{Name: "test_namespace"}, nil).Once() + + mockClient.On("DropDatabase", mock.Anything, "test_namespace", false, false). + Return(nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + err := hiveCatalog.DropNamespace(context.TODO(), DatabaseIdentifier("test_namespace")) + assert.NoError(err) + + mockClient.AssertExpectations(t) +} + +func TestHiveDropNamespaceNotExists(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetDatabase", mock.Anything, "nonexistent"). + Return(nil, errNoSuchObject).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + err := hiveCatalog.DropNamespace(context.TODO(), DatabaseIdentifier("nonexistent")) + assert.Error(err) + assert.True(errors.Is(err, catalog.ErrNoSuchNamespace)) + + mockClient.AssertExpectations(t) +} + +func TestHiveDropNamespaceNotEmpty(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetDatabase", mock.Anything, "nonempty_db"). + Return(&hive_metastore.Database{Name: "nonempty_db"}, nil).Once() + + mockClient.On("DropDatabase", mock.Anything, "nonempty_db", false, false). + Return(errInvalidOperation).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + err := hiveCatalog.DropNamespace(context.TODO(), DatabaseIdentifier("nonempty_db")) + assert.Error(err) + assert.True(errors.Is(err, catalog.ErrNamespaceNotEmpty)) + + mockClient.AssertExpectations(t) +} + +func TestHiveCheckNamespaceExists(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetDatabase", mock.Anything, "existing_db"). + Return(&hive_metastore.Database{Name: "existing_db"}, nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + exists, err := hiveCatalog.CheckNamespaceExists(context.TODO(), DatabaseIdentifier("existing_db")) + assert.NoError(err) + assert.True(exists) + + mockClient.AssertExpectations(t) +} + +func TestHiveCheckNamespaceNotExists(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetDatabase", mock.Anything, "nonexistent_db"). + Return(nil, errNoSuchObject).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + exists, err := hiveCatalog.CheckNamespaceExists(context.TODO(), DatabaseIdentifier("nonexistent_db")) + assert.NoError(err) + assert.False(exists) + + mockClient.AssertExpectations(t) +} + +func TestHiveLoadNamespaceProperties(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetDatabase", mock.Anything, "test_db"). + Return(&hive_metastore.Database{ + Name: "test_db", + Description: "Test database", + LocationUri: "s3://test-bucket/test_db", + Parameters: map[string]string{ + "custom_param": "custom_value", + }, + }, nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + props, err := hiveCatalog.LoadNamespaceProperties(context.TODO(), DatabaseIdentifier("test_db")) + assert.NoError(err) + assert.Equal("s3://test-bucket/test_db", props["location"]) + assert.Equal("Test database", props["comment"]) + assert.Equal("custom_value", props["custom_param"]) + + mockClient.AssertExpectations(t) +} + +func TestHiveCheckTableExists(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetTable", mock.Anything, "test_database", "test_table"). + Return(testIcebergHiveTable1, nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + exists, err := hiveCatalog.CheckTableExists(context.TODO(), TableIdentifier("test_database", "test_table")) + assert.NoError(err) + assert.True(exists) + + mockClient.AssertExpectations(t) +} + +func TestHiveCheckTableNotExists(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetTable", mock.Anything, "test_database", "nonexistent"). + Return(nil, errNoSuchObject).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + exists, err := hiveCatalog.CheckTableExists(context.TODO(), TableIdentifier("test_database", "nonexistent")) + assert.NoError(err) + assert.False(exists) + + mockClient.AssertExpectations(t) +} + +func TestHiveCheckTableExistsNonIceberg(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetTable", mock.Anything, "test_database", "other_table"). + Return(testNonIcebergHiveTable, nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + exists, err := hiveCatalog.CheckTableExists(context.TODO(), TableIdentifier("test_database", "other_table")) + assert.NoError(err) + assert.False(exists) // Non-Iceberg table should return false + + mockClient.AssertExpectations(t) +} + +func TestHiveDropTable(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetTable", mock.Anything, "test_database", "test_table"). + Return(testIcebergHiveTable1, nil).Once() + + mockClient.On("DropTable", mock.Anything, "test_database", "test_table", false). + Return(nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + err := hiveCatalog.DropTable(context.TODO(), TableIdentifier("test_database", "test_table")) + assert.NoError(err) + + mockClient.AssertExpectations(t) +} + +func TestHiveDropTableNotExists(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetTable", mock.Anything, "test_database", "nonexistent"). + Return(nil, errNoSuchObject).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + err := hiveCatalog.DropTable(context.TODO(), TableIdentifier("test_database", "nonexistent")) + assert.Error(err) + assert.True(errors.Is(err, catalog.ErrNoSuchTable)) + + mockClient.AssertExpectations(t) +} + +func TestHiveDropTableNonIceberg(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetTable", mock.Anything, "test_database", "other_table"). + Return(testNonIcebergHiveTable, nil).Once() + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + err := hiveCatalog.DropTable(context.TODO(), TableIdentifier("test_database", "other_table")) + assert.Error(err) + assert.Contains(err.Error(), "is not an Iceberg table") + + mockClient.AssertExpectations(t) +} + +func TestHiveCatalogType(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + assert.Equal(catalog.Hive, hiveCatalog.CatalogType()) +} + +func TestIsIcebergTable(t *testing.T) { + tests := []struct { + name string + table *hive_metastore.Table + expected bool + }{ + { + name: "iceberg table uppercase", + table: testIcebergHiveTable1, + expected: true, + }, + { + name: "iceberg table lowercase", + table: &hive_metastore.Table{ + Parameters: map[string]string{TableTypeKey: "iceberg"}, + }, + expected: true, + }, + { + name: "iceberg table mixed case", + table: &hive_metastore.Table{ + Parameters: map[string]string{TableTypeKey: "IcEbErG"}, + }, + expected: true, + }, + { + name: "non-iceberg table", + table: testNonIcebergHiveTable, + expected: false, + }, + { + name: "nil table", + table: nil, + expected: false, + }, + { + name: "table without parameters", + table: &hive_metastore.Table{}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := require.New(t) + assert.Equal(tt.expected, isIcebergTable(tt.table)) + }) + } +} + +func TestIcebergTypeToHiveType(t *testing.T) { + tests := []struct { + name string + icebergType iceberg.Type + expectedHive string + }{ + {"boolean", iceberg.PrimitiveTypes.Bool, "boolean"}, + {"int32", iceberg.PrimitiveTypes.Int32, "int"}, + {"int64", iceberg.PrimitiveTypes.Int64, "bigint"}, + {"float32", iceberg.PrimitiveTypes.Float32, "float"}, + {"float64", iceberg.PrimitiveTypes.Float64, "double"}, + {"date", iceberg.PrimitiveTypes.Date, "date"}, + {"time", iceberg.PrimitiveTypes.Time, "string"}, + {"timestamp", iceberg.PrimitiveTypes.Timestamp, "timestamp"}, + {"timestamptz", iceberg.PrimitiveTypes.TimestampTz, "timestamp"}, + {"string", iceberg.PrimitiveTypes.String, "string"}, + {"uuid", iceberg.PrimitiveTypes.UUID, "string"}, + {"binary", iceberg.PrimitiveTypes.Binary, "binary"}, + {"decimal", iceberg.DecimalTypeOf(10, 2), "decimal(10,2)"}, + {"fixed", iceberg.FixedTypeOf(16), "binary(16)"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := require.New(t) + assert.Equal(tt.expectedHive, icebergTypeToHiveType(tt.icebergType)) + }) + } +} + +func TestSchemaToHiveColumns(t *testing.T) { + assert := require.New(t) + + columns := schemaToHiveColumns(testSchema) + assert.Len(columns, 3) + + // Check first column + assert.Equal("foo", columns[0].Name) + assert.Equal("string", columns[0].Type) + + // Check second column + assert.Equal("bar", columns[1].Name) + assert.Equal("int", columns[1].Type) + + // Check third column + assert.Equal("baz", columns[2].Name) + assert.Equal("boolean", columns[2].Type) +} + +func TestUpdateNamespaceProperties(t *testing.T) { + tests := []struct { + name string + initial map[string]string + updates map[string]string + removals []string + expected catalog.PropertiesUpdateSummary + shouldError bool + }{ + { + name: "Overlapping removals and updates", + initial: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + updates: map[string]string{ + "key1": "new_value1", + }, + removals: []string{"key1"}, + shouldError: true, + }, + { + name: "Happy path with updates and removals", + initial: map[string]string{ + "key1": "value1", + "key2": "value2", + "key4": "value4", + }, + updates: map[string]string{ + "key2": "new_value2", + }, + removals: []string{"key4"}, + expected: catalog.PropertiesUpdateSummary{ + Removed: []string{"key4"}, + Updated: []string{"key2"}, + Missing: []string{}, + }, + shouldError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert := require.New(t) + + mockClient := &mockHiveClient{} + + mockClient.On("GetDatabase", mock.Anything, "test_namespace"). + Return(&hive_metastore.Database{ + Name: "test_namespace", + Parameters: tt.initial, + }, nil).Once() + + if !tt.shouldError { + mockClient.On("AlterDatabase", mock.Anything, "test_namespace", mock.Anything). + Return(nil).Once() + } + + hiveCatalog := NewCatalogWithClient(mockClient, iceberg.Properties{}) + + summary, err := hiveCatalog.UpdateNamespaceProperties(context.TODO(), DatabaseIdentifier("test_namespace"), tt.removals, tt.updates) + if tt.shouldError { + assert.Error(err) + } else { + assert.NoError(err) + assert.ElementsMatch(tt.expected.Removed, summary.Removed) + assert.ElementsMatch(tt.expected.Updated, summary.Updated) + assert.ElementsMatch(tt.expected.Missing, summary.Missing) + } + }) + } +} + +func TestIdentifierValidation(t *testing.T) { + t.Run("valid table identifier", func(t *testing.T) { + assert := require.New(t) + db, tbl, err := identifierToTableName([]string{"database", "table"}) + assert.NoError(err) + assert.Equal("database", db) + assert.Equal("table", tbl) + }) + + t.Run("invalid table identifier - too short", func(t *testing.T) { + assert := require.New(t) + _, _, err := identifierToTableName([]string{"database"}) + assert.Error(err) + }) + + t.Run("invalid table identifier - too long", func(t *testing.T) { + assert := require.New(t) + _, _, err := identifierToTableName([]string{"a", "b", "c"}) + assert.Error(err) + }) + + t.Run("valid database identifier", func(t *testing.T) { + assert := require.New(t) + db, err := identifierToDatabase([]string{"database"}) + assert.NoError(err) + assert.Equal("database", db) + }) + + t.Run("invalid database identifier", func(t *testing.T) { + assert := require.New(t) + _, err := identifierToDatabase([]string{"a", "b"}) + assert.Error(err) + }) +} diff --git a/catalog/hive/options.go b/catalog/hive/options.go new file mode 100644 index 000000000..ec55dc86b --- /dev/null +++ b/catalog/hive/options.go @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hive + +import ( + "github.com/apache/iceberg-go" +) + +// Configuration property keys for the Hive catalog. +const ( + // URI is the Thrift URI for the Hive Metastore (e.g., "thrift://localhost:9083") + URI = "uri" + + // Warehouse is the default warehouse location for tables + Warehouse = "warehouse" + + // KerberosAuth enables Kerberos authentication + KerberosAuth = "hive.kerberos-authentication" + + TableTypeKey = "table_type" + TableTypeIceberg = "ICEBERG" + TableTypeExternalTable = "EXTERNAL_TABLE" + MetadataLocationKey = "metadata_location" + PreviousMetadataLocationKey = "previous_metadata_location" + ExternalKey = "EXTERNAL" +) + +// HiveOptions contains configuration options for the Hive Metastore catalog. +type HiveOptions struct { + URI string + Warehouse string + KerberosAuth bool + props iceberg.Properties +} + +// NewHiveOptions creates a new HiveOptions with default values. +func NewHiveOptions() *HiveOptions { + return &HiveOptions{ + props: iceberg.Properties{}, + } +} + +// ApplyProperties applies properties from an iceberg.Properties map. +func (o *HiveOptions) ApplyProperties(props iceberg.Properties) { + o.props = props + + if uri, ok := props[URI]; ok { + o.URI = uri + } + if warehouse, ok := props[Warehouse]; ok { + o.Warehouse = warehouse + } + if props.GetBool(KerberosAuth, false) { + o.KerberosAuth = true + } +} + +// Option is a functional option for configuring the Hive catalog. +type Option func(*HiveOptions) + +// WithURI sets the Thrift URI for the Hive Metastore. +func WithURI(uri string) Option { + return func(o *HiveOptions) { + o.URI = uri + } +} + +// WithWarehouse sets the default warehouse location. +func WithWarehouse(warehouse string) Option { + return func(o *HiveOptions) { + o.Warehouse = warehouse + } +} + +// WithKerberosAuth enables Kerberos authentication. +func WithKerberosAuth(enabled bool) Option { + return func(o *HiveOptions) { + o.KerberosAuth = enabled + } +} + +// WithProperties sets additional properties for the catalog. +func WithProperties(props iceberg.Properties) Option { + return func(o *HiveOptions) { + o.props = props + } +} diff --git a/catalog/hive/schema.go b/catalog/hive/schema.go new file mode 100644 index 000000000..cf3cc4260 --- /dev/null +++ b/catalog/hive/schema.go @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hive + +import ( + "fmt" + "strings" + + "github.com/apache/iceberg-go" + "github.com/beltran/gohive/hive_metastore" +) + +// schemaToHiveColumns converts an Iceberg schema to Hive FieldSchema columns. +func schemaToHiveColumns(schema *iceberg.Schema) []*hive_metastore.FieldSchema { + columns := make([]*hive_metastore.FieldSchema, 0, len(schema.Fields())) + for _, field := range schema.Fields() { + columns = append(columns, fieldToHiveColumn(field)) + } + return columns +} + +// fieldToHiveColumn converts an Iceberg NestedField to a Hive FieldSchema. +func fieldToHiveColumn(field iceberg.NestedField) *hive_metastore.FieldSchema { + return &hive_metastore.FieldSchema{ + Name: field.Name, + Type: icebergTypeToHiveType(field.Type), + Comment: field.Doc, + } +} + +// icebergTypeToHiveType converts an Iceberg type to a Hive type string. +// Reference: https://cwiki.apache.org/confluence/display/hive/languagemanual+types +func icebergTypeToHiveType(typ iceberg.Type) string { + switch t := typ.(type) { + case iceberg.BooleanType: + return "boolean" + case iceberg.Int32Type: + return "int" + case iceberg.Int64Type: + return "bigint" + case iceberg.Float32Type: + return "float" + case iceberg.Float64Type: + return "double" + case iceberg.DateType: + return "date" + case iceberg.TimeType: + // Hive doesn't have a native time type, use string + return "string" + case iceberg.TimestampType: + return "timestamp" + case iceberg.TimestampTzType: + return "timestamp" + case iceberg.StringType: + return "string" + case iceberg.UUIDType: + // Represent UUID as string + return "string" + case iceberg.BinaryType: + return "binary" + case iceberg.DecimalType: + return fmt.Sprintf("decimal(%d,%d)", t.Precision(), t.Scale()) + case iceberg.FixedType: + return fmt.Sprintf("binary(%d)", t.Len()) + case *iceberg.StructType: + var fieldStrings []string + for _, field := range t.Fields() { + fieldStrings = append(fieldStrings, + fmt.Sprintf("%s:%s", field.Name, icebergTypeToHiveType(field.Type))) + } + return fmt.Sprintf("struct<%s>", strings.Join(fieldStrings, ",")) + case *iceberg.ListType: + elementField := t.ElementField() + return fmt.Sprintf("array<%s>", icebergTypeToHiveType(elementField.Type)) + case *iceberg.MapType: + keyField := t.KeyField() + valueField := t.ValueField() + return fmt.Sprintf("map<%s,%s>", + icebergTypeToHiveType(keyField.Type), + icebergTypeToHiveType(valueField.Type)) + default: + return "string" + } +} + +// constructHiveTable creates a Hive Table struct for an Iceberg table. +func constructHiveTable(dbName, tableName, location, metadataLocation string, schema *iceberg.Schema, props map[string]string) *hive_metastore.Table { + parameters := make(map[string]string) + + // Set Iceberg-specific parameters + parameters[TableTypeKey] = TableTypeIceberg + parameters[MetadataLocationKey] = metadataLocation + parameters[ExternalKey] = "TRUE" + + // Set storage handler - required for Hive to query Iceberg tables + parameters["storage_handler"] = "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler" + + // Copy additional properties + for k, v := range props { + parameters[k] = v + } + + return &hive_metastore.Table{ + TableName: tableName, + DbName: dbName, + TableType: TableTypeExternalTable, + Sd: &hive_metastore.StorageDescriptor{ + Cols: schemaToHiveColumns(schema), + Location: location, + InputFormat: "org.apache.iceberg.mr.hive.HiveIcebergInputFormat", + OutputFormat: "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat", + SerdeInfo: &hive_metastore.SerDeInfo{ + SerializationLib: "org.apache.iceberg.mr.hive.HiveIcebergSerDe", + }, + }, + Parameters: parameters, + } +} + +// updateHiveTableForCommit updates a Hive Table struct for a commit operation. +func updateHiveTableForCommit(existing *hive_metastore.Table, newMetadataLocation string) *hive_metastore.Table { + // Copy the existing table + updated := *existing + + // Update parameters + if updated.Parameters == nil { + updated.Parameters = make(map[string]string) + } + + // Store previous metadata location + if oldLocation, ok := updated.Parameters[MetadataLocationKey]; ok { + updated.Parameters[PreviousMetadataLocationKey] = oldLocation + } + + // Set new metadata location + updated.Parameters[MetadataLocationKey] = newMetadataLocation + + return &updated +} + +// isIcebergTable checks if a Hive table is an Iceberg table. +func isIcebergTable(tbl *hive_metastore.Table) bool { + if tbl == nil || tbl.Parameters == nil { + return false + } + + tableType, ok := tbl.Parameters[TableTypeKey] + if !ok { + return false + } + + return strings.EqualFold(tableType, TableTypeIceberg) +} + +// getMetadataLocation returns the metadata location from a Hive table. +func getMetadataLocation(tbl *hive_metastore.Table) (string, error) { + if tbl == nil || tbl.Parameters == nil { + return "", fmt.Errorf("table has no parameters") + } + + location, ok := tbl.Parameters[MetadataLocationKey] + if !ok { + return "", fmt.Errorf("table does not have %s parameter", MetadataLocationKey) + } + + return location, nil +} diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go index 6d92ca0ee..128385914 100644 --- a/cmd/iceberg/main.go +++ b/cmd/iceberg/main.go @@ -28,6 +28,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/glue" + "github.com/apache/iceberg-go/catalog/hive" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/config" "github.com/apache/iceberg-go/table" @@ -196,6 +197,16 @@ func main() { glue.WithAwsConfig(awscfg), } cat = glue.NewCatalog(opts...) + case catalog.Hive: + props := iceberg.Properties{ + hive.URI: cfg.URI, + } + if len(cfg.Warehouse) > 0 { + props[hive.Warehouse] = cfg.Warehouse + } + if cat, err = hive.NewCatalog(props); err != nil { + log.Fatal(err) + } default: log.Fatal("unrecognized catalog type") } diff --git a/go.mod b/go.mod index 29a7cb6cf..f15d9ad79 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.94.0 github.com/aws/smithy-go v1.24.0 github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools v0.0.0-20250407191926-092f3e54b837 + github.com/beltran/gohive v1.8.1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/docker/docker v28.5.2+incompatible github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 @@ -100,6 +101,8 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.30.8 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 // indirect + github.com/beltran/gosasl v1.0.0 // indirect + github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/buger/goterm v1.0.4 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -150,6 +153,7 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/go-zookeeper/zk v1.0.4 // indirect github.com/goccy/go-json v0.10.5 // indirect github.com/goccy/go-yaml v1.17.1 // indirect github.com/gofrs/flock v0.12.1 // indirect diff --git a/go.sum b/go.sum index 15b080166..cb686c407 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,12 @@ github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools v0.0.0-20250407191926-092f3e54b837 h1:8eMceEa0ib+nqJuGsyowuZaVBVAr685oK6WrNIit+0g= github.com/awsdocs/aws-doc-sdk-examples/gov2/testtools v0.0.0-20250407191926-092f3e54b837/go.mod h1:9Oj/8PZn3D5Ftp/Z1QWrIEFE0daERMqfJawL9duHRfc= +github.com/beltran/gohive v1.8.1 h1:qlygmroy3mKtKIQSpV/FqXJHty1LsPxF+JTQA5mbjwU= +github.com/beltran/gohive v1.8.1/go.mod h1:BCgNAhr/wnbyXfp2yN9ZY4pVrGrtVqG4hhNDDXIal1U= +github.com/beltran/gosasl v1.0.0 h1:iiRtLxkvKhrNv3Ohh/n2NiyyfwIo/UbMzy/dZWiUHXE= +github.com/beltran/gosasl v1.0.0/go.mod h1:Qx8cW6jkI8riyzmklj80kAIkv+iezFUTBiGU0qHhHes= +github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab h1:ayfcn60tXOSYy5zUN1AMSTQo4nJCf7hrdzAVchpPst4= +github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab/go.mod h1:GLe4UoSyvJ3cVG+DVtKen5eAiaD8mAJFuV5PT3Eeg9Q= github.com/beorn7/perks v0.0.0-20150223135152-b965b613227f/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -315,6 +321,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/go-zookeeper/zk v1.0.4 h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I= +github.com/go-zookeeper/zk v1.0.4/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/goccy/go-yaml v1.17.1 h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY= diff --git a/internal/recipe/docker-compose.yml b/internal/recipe/docker-compose.yml index 7bf8ae483..5abe44914 100644 --- a/internal/recipe/docker-compose.yml +++ b/internal/recipe/docker-compose.yml @@ -118,5 +118,42 @@ services: - 4443:4443 command: ["-scheme", "http", "-port", "4443", "-backend", "memory", "-public-host", "fake-gcs-server:4443"] + hive-metastore: + image: apache/hive:4.0.0 + container_name: hive-metastore + user: root + networks: + iceberg_net: + ports: + - 9083:9083 + environment: + - SERVICE_NAME=metastore + - DB_DRIVER=derby + volumes: + - hive-metastore-data:/opt/hive/data + - /tmp/iceberg-warehouse:/tmp/iceberg-warehouse + + hiveserver2: + image: apache/hive:4.0.0 + container_name: hiveserver2 + user: root + networks: + iceberg_net: + depends_on: + - hive-metastore + ports: + - "10003:10000" # Adjusted to avoid conflict with spark-iceberg + - "10002:10002" # Web UI + environment: + - SERVICE_NAME=hiveserver2 + - SERVICE_OPTS=-Dhive.metastore.uris=thrift://hive-metastore:9083 + - IS_RESUME=true + volumes: + - hive-metastore-data:/opt/hive/data + - /tmp/iceberg-warehouse:/tmp/iceberg-warehouse + networks: iceberg_net: + +volumes: + hive-metastore-data: From 51b1dda826dec62e9ce88f47d04443616f010dc0 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Wed, 14 Jan 2026 19:50:39 -0600 Subject: [PATCH 2/7] Added license to README.md --- catalog/hive/README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/catalog/hive/README.md b/catalog/hive/README.md index d390972b7..b53ece3da 100644 --- a/catalog/hive/README.md +++ b/catalog/hive/README.md @@ -1,3 +1,20 @@ + + # Hive Metastore Catalog This package provides a Hive Metastore catalog implementation for Apache Iceberg in Go. From 8f56ae77cd23fab13dfb6e3daf4252b1273a6cf2 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Wed, 14 Jan 2026 20:22:16 -0600 Subject: [PATCH 3/7] Fix lint errors. --- catalog/hive/hive.go | 20 ++++++++++++++++++++ catalog/hive/hive_test.go | 21 +++++++++++++++++---- catalog/hive/schema.go | 7 ++++++- go.mod | 11 ++--------- 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go index a34f955b8..4a08d1e86 100644 --- a/catalog/hive/hive.go +++ b/catalog/hive/hive.go @@ -84,6 +84,7 @@ func NewCatalog(props iceberg.Properties, opts ...Option) (*Catalog, error) { func NewCatalogWithClient(client HiveClient, props iceberg.Properties) *Catalog { o := NewHiveOptions() o.ApplyProperties(props) + return &Catalog{ client: client, opts: o, @@ -98,6 +99,7 @@ func (c *Catalog) CatalogType() catalog.Type { // Close closes the connection to the Hive Metastore. func (c *Catalog) Close() error { c.client.Close() + return nil } @@ -107,6 +109,7 @@ func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) it database, err := identifierToDatabase(namespace) if err != nil { yield(nil, err) + return } @@ -114,6 +117,7 @@ func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) it tableNames, err := c.client.GetTables(ctx, database, "*") if err != nil { yield(nil, fmt.Errorf("failed to list tables in %s: %w", database, err)) + return } @@ -198,6 +202,7 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, if isAlreadyExistsError(err) { return nil, fmt.Errorf("%w: %s.%s", catalog.ErrTableAlreadyExists, database, tableName) } + return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) } @@ -334,6 +339,7 @@ func (c *Catalog) CheckTableExists(ctx context.Context, identifier table.Identif if isNoSuchObjectError(err) { return false, nil } + return false, err } @@ -389,6 +395,7 @@ func (c *Catalog) CreateNamespace(ctx context.Context, namespace table.Identifie if isAlreadyExistsError(err) { return fmt.Errorf("%w: %s", catalog.ErrNamespaceAlreadyExists, database) } + return fmt.Errorf("failed to create namespace %s: %w", database, err) } @@ -408,6 +415,7 @@ func (c *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) if isNoSuchObjectError(err) { return fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, database) } + return err } @@ -416,6 +424,7 @@ func (c *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) if isInvalidOperationError(err) { return fmt.Errorf("%w: %s", catalog.ErrNamespaceNotEmpty, database) } + return fmt.Errorf("failed to drop namespace %s: %w", database, err) } @@ -434,6 +443,7 @@ func (c *Catalog) CheckNamespaceExists(ctx context.Context, namespace table.Iden if isNoSuchObjectError(err) { return false, nil } + return false, err } @@ -452,6 +462,7 @@ func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace table.I if isNoSuchObjectError(err) { return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, database) } + return nil, fmt.Errorf("failed to get namespace %s: %w", database, err) } @@ -520,6 +531,7 @@ func (c *Catalog) getIcebergTable(ctx context.Context, database, tableName strin if isNoSuchObjectError(err) { return nil, fmt.Errorf("%w: %s.%s", catalog.ErrNoSuchTable, database, tableName) } + return nil, fmt.Errorf("failed to get table %s.%s: %w", database, tableName, err) } @@ -534,6 +546,7 @@ func identifierToTableName(identifier table.Identifier) (string, string, error) if len(identifier) != 2 { return "", "", fmt.Errorf("invalid identifier, expected [database, table]: %v", identifier) } + return identifier[0], identifier[1], nil } @@ -541,6 +554,7 @@ func identifierToDatabase(identifier table.Identifier) (string, error) { if len(identifier) != 1 { return "", fmt.Errorf("invalid identifier, expected [database]: %v", identifier) } + return identifier[0], nil } @@ -560,7 +574,9 @@ func isNoSuchObjectError(err error) bool { if err == nil { return false } + errStr := err.Error() + return strings.Contains(errStr, "NoSuchObjectException") || strings.Contains(errStr, "not found") || strings.Contains(errStr, "does not exist") @@ -570,7 +586,9 @@ func isAlreadyExistsError(err error) bool { if err == nil { return false } + errStr := err.Error() + return strings.Contains(errStr, "AlreadyExistsException") || strings.Contains(errStr, "already exists") } @@ -579,7 +597,9 @@ func isInvalidOperationError(err error) bool { if err == nil { return false } + errStr := err.Error() + return strings.Contains(errStr, "InvalidOperationException") || strings.Contains(errStr, "is not empty") } diff --git a/catalog/hive/hive_test.go b/catalog/hive/hive_test.go index 5712d48c6..9d107e367 100644 --- a/catalog/hive/hive_test.go +++ b/catalog/hive/hive_test.go @@ -40,26 +40,31 @@ func (m *mockHiveClient) GetDatabase(ctx context.Context, name string) (*hive_me if args.Get(0) == nil { return nil, args.Error(1) } + return args.Get(0).(*hive_metastore.Database), args.Error(1) } func (m *mockHiveClient) GetAllDatabases(ctx context.Context) ([]string, error) { args := m.Called(ctx) + return args.Get(0).([]string), args.Error(1) } func (m *mockHiveClient) CreateDatabase(ctx context.Context, database *hive_metastore.Database) error { args := m.Called(ctx, database) + return args.Error(0) } func (m *mockHiveClient) DropDatabase(ctx context.Context, name string, deleteData, cascade bool) error { args := m.Called(ctx, name, deleteData, cascade) + return args.Error(0) } func (m *mockHiveClient) AlterDatabase(ctx context.Context, name string, database *hive_metastore.Database) error { args := m.Called(ctx, name, database) + return args.Error(0) } @@ -68,26 +73,31 @@ func (m *mockHiveClient) GetTable(ctx context.Context, dbName, tableName string) if args.Get(0) == nil { return nil, args.Error(1) } + return args.Get(0).(*hive_metastore.Table), args.Error(1) } func (m *mockHiveClient) GetTables(ctx context.Context, dbName, pattern string) ([]string, error) { args := m.Called(ctx, dbName, pattern) + return args.Get(0).([]string), args.Error(1) } func (m *mockHiveClient) CreateTable(ctx context.Context, tbl *hive_metastore.Table) error { args := m.Called(ctx, tbl) + return args.Error(0) } func (m *mockHiveClient) DropTable(ctx context.Context, dbName, tableName string, deleteData bool) error { args := m.Called(ctx, dbName, tableName, deleteData) + return args.Error(0) } func (m *mockHiveClient) AlterTable(ctx context.Context, dbName, tableName string, newTable *hive_metastore.Table) error { args := m.Called(ctx, dbName, tableName, newTable) + return args.Error(0) } @@ -138,10 +148,11 @@ var testSchema = iceberg.NewSchemaWithIdentifiers(0, []int{}, iceberg.NestedField{ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool}) // Error helpers for mocking - -var errNoSuchObject = errors.New("NoSuchObjectException: object not found") -var errAlreadyExists = errors.New("AlreadyExistsException: object already exists") -var errInvalidOperation = errors.New("InvalidOperationException: Database is not empty") +var ( + errNoSuchObject = errors.New("NoSuchObjectException: object not found") + errAlreadyExists = errors.New("AlreadyExistsException: object already exists") + errInvalidOperation = errors.New("InvalidOperationException: Database is not empty") +) // Tests @@ -170,6 +181,7 @@ func TestHiveListTables(t *testing.T) { for tbl, err := range iter { if err != nil { lastErr = err + break } tbls = append(tbls, tbl) @@ -200,6 +212,7 @@ func TestHiveListTablesEmpty(t *testing.T) { for tbl, err := range iter { if err != nil { lastErr = err + break } tbls = append(tbls, tbl) diff --git a/catalog/hive/schema.go b/catalog/hive/schema.go index cf3cc4260..ea1891d4f 100644 --- a/catalog/hive/schema.go +++ b/catalog/hive/schema.go @@ -18,6 +18,7 @@ package hive import ( + "errors" "fmt" "strings" @@ -31,6 +32,7 @@ func schemaToHiveColumns(schema *iceberg.Schema) []*hive_metastore.FieldSchema { for _, field := range schema.Fields() { columns = append(columns, fieldToHiveColumn(field)) } + return columns } @@ -83,13 +85,16 @@ func icebergTypeToHiveType(typ iceberg.Type) string { fieldStrings = append(fieldStrings, fmt.Sprintf("%s:%s", field.Name, icebergTypeToHiveType(field.Type))) } + return fmt.Sprintf("struct<%s>", strings.Join(fieldStrings, ",")) case *iceberg.ListType: elementField := t.ElementField() + return fmt.Sprintf("array<%s>", icebergTypeToHiveType(elementField.Type)) case *iceberg.MapType: keyField := t.KeyField() valueField := t.ValueField() + return fmt.Sprintf("map<%s,%s>", icebergTypeToHiveType(keyField.Type), icebergTypeToHiveType(valueField.Type)) @@ -170,7 +175,7 @@ func isIcebergTable(tbl *hive_metastore.Table) bool { // getMetadataLocation returns the metadata location from a Hive table. func getMetadataLocation(tbl *hive_metastore.Table) (string, error) { if tbl == nil || tbl.Parameters == nil { - return "", fmt.Errorf("table has no parameters") + return "", errors.New("table has no parameters") } location, ok := tbl.Parameters[MetadataLocationKey] diff --git a/go.mod b/go.mod index 1798fb226..34ffd6bad 100644 --- a/go.mod +++ b/go.mod @@ -94,15 +94,6 @@ require ( github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.7 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.16 // indirect - github.com/aws/aws-sdk-go-v2/service/signin v1.0.4 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.30.8 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.12 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 // indirect - github.com/beltran/gosasl v1.0.0 // indirect - github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17 // indirect @@ -110,6 +101,8 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.30.9 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.13 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect + github.com/beltran/gosasl v1.0.0 // indirect + github.com/beltran/gssapi v0.0.0-20200324152954-d86554db4bab // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/buger/goterm v1.0.4 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect From d85f65d0524d5da909b258944d443c1618dc444e Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Wed, 14 Jan 2026 20:31:20 -0600 Subject: [PATCH 4/7] Fixed staticcheck errors --- catalog/hive/hive.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go index 4a08d1e86..c2bdee814 100644 --- a/catalog/hive/hive.go +++ b/catalog/hive/hive.go @@ -351,7 +351,7 @@ func (c *Catalog) CheckTableExists(ctx context.Context, identifier table.Identif // ListNamespaces returns a list of namespaces in the catalog. func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) { // Hive doesn't support hierarchical namespaces - if parent != nil && len(parent) > 0 { + if len(parent) > 0 { return nil, errors.New("hierarchical namespace is not supported") } From c3e50aad5e8dd027d98f8562f61916d3e71dd72c Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Wed, 14 Jan 2026 21:01:49 -0600 Subject: [PATCH 5/7] Added hive integration test to github actions. --- .github/workflows/go-integration.yml | 2 +- catalog/hive/hive_integration_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/go-integration.yml b/.github/workflows/go-integration.yml index d8d94427c..76941e45f 100644 --- a/.github/workflows/go-integration.yml +++ b/.github/workflows/go-integration.yml @@ -69,7 +69,7 @@ jobs: go test -tags integration -v -run="^TestScanner" ./table go test -tags integration -v ./io go test -tags integration -v -run="^TestRestIntegration$" ./catalog/rest - + go test -tags=integration -v ./catalog/hive/... - name: Run spark integration tests env: AWS_S3_ENDPOINT: "${{ env.AWS_S3_ENDPOINT }}" diff --git a/catalog/hive/hive_integration_test.go b/catalog/hive/hive_integration_test.go index 046c46616..cc0e6f1fd 100644 --- a/catalog/hive/hive_integration_test.go +++ b/catalog/hive/hive_integration_test.go @@ -147,8 +147,9 @@ func TestHiveIntegrationUpdateNamespaceProperties(t *testing.T) { // Create namespace with initial properties initialProps := iceberg.Properties{ - "key1": "value1", - "key2": "value2", + "location": fmt.Sprintf("/tmp/iceberg-warehouse/%s", dbName), + "key1": "value1", + "key2": "value2", } err := cat.CreateNamespace(context.TODO(), DatabaseIdentifier(dbName), initialProps) From 5e9a1ed996d72102e7df47b5203978aba77a67c7 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Thu, 22 Jan 2026 18:02:24 -0600 Subject: [PATCH 6/7] Added lock logic to commitTable --- catalog/hive/README.md | 208 -------------------------------------- catalog/hive/client.go | 24 ++++- catalog/hive/hive.go | 48 ++------- catalog/hive/hive_test.go | 25 ++++- catalog/hive/options.go | 48 +++++++-- catalog/hive/schema.go | 5 - 6 files changed, 91 insertions(+), 267 deletions(-) delete mode 100644 catalog/hive/README.md diff --git a/catalog/hive/README.md b/catalog/hive/README.md deleted file mode 100644 index b53ece3da..000000000 --- a/catalog/hive/README.md +++ /dev/null @@ -1,208 +0,0 @@ - - -# Hive Metastore Catalog - -This package provides a Hive Metastore catalog implementation for Apache Iceberg in Go. - -## Features - -- Full `catalog.Catalog` interface implementation -- Namespace (database) management: create, drop, list, describe -- Table management: create, drop, rename, list, load -- Table metadata commits with optimistic locking -- Support for NOSASL and Kerberos authentication -- Uses the [gohive](https://github.com/beltran/gohive) library for Thrift communication - -## Prerequisites - -### Running Hive Metastore Locally - -Start the Hive Metastore using Docker Compose: - -```bash -cd internal/recipe -docker-compose up -d hive-metastore hiveserver2 -``` - -This starts: -- **hive-metastore** on port `9083` (Thrift RPC) -- **hiveserver2** on port `10002` (Web UI) and `10003` (JDBC) - -Verify the services are running: - -```bash -docker ps | grep hive -``` - -Access the HiveServer2 Web UI at: http://localhost:10002 - -## CLI Usage - -The `iceberg` CLI supports the Hive catalog. Use `--catalog hive` and `--uri thrift://localhost:9083`. - -### List Namespaces - -```bash -go run ./cmd/iceberg list --catalog hive --uri thrift://localhost:9083 -``` - -### Create a Namespace - -```bash -# Use /tmp/iceberg-warehouse which is mounted in Docker containers -go run ./cmd/iceberg create namespace --catalog hive --uri thrift://localhost:9083 \ - --description "Test namespace for Iceberg tables" \ - --location-uri /tmp/iceberg-warehouse/test_ns \ - test_ns -``` - -### Describe a Namespace - -```bash -go run ./cmd/iceberg describe namespace --catalog hive --uri thrift://localhost:9083 test_ns -``` - -### Create a Table - -```bash -go run ./cmd/iceberg create table --catalog hive --uri thrift://localhost:9083 \ - --schema '[{"name":"id","type":"long","required":true},{"name":"name","type":"string"},{"name":"created_at","type":"timestamp"}]' \ - --location-uri /tmp/iceberg-warehouse/test_ns/users \ - test_ns.users -``` - -### List Tables in a Namespace - -```bash -go run ./cmd/iceberg list --catalog hive --uri thrift://localhost:9083 test_ns -``` - -### Describe a Table - -```bash -go run ./cmd/iceberg describe table --catalog hive --uri thrift://localhost:9083 test_ns.users -``` - -### Get Table Schema - -```bash -go run ./cmd/iceberg schema --catalog hive --uri thrift://localhost:9083 test_ns.users -``` - -### Get Table Location - -```bash -go run ./cmd/iceberg location --catalog hive --uri thrift://localhost:9083 test_ns.users -``` - -### Set Table Properties - -```bash -go run ./cmd/iceberg properties set table --catalog hive --uri thrift://localhost:9083 \ - test_ns.users write.format.default parquet -``` - -### Get Table Properties - -```bash -go run ./cmd/iceberg properties get table --catalog hive --uri thrift://localhost:9083 test_ns.users -``` - -### Drop a Table - -```bash -go run ./cmd/iceberg drop table --catalog hive --uri thrift://localhost:9083 test_ns.users -``` - -### Drop a Namespace - -```bash -go run ./cmd/iceberg drop namespace --catalog hive --uri thrift://localhost:9083 test_ns -``` - -## Programmatic Usage - -```go -package main - -import ( - "context" - "fmt" - "log" - - "github.com/apache/iceberg-go" - "github.com/apache/iceberg-go/catalog/hive" -) - -func main() { - ctx := context.Background() - - // Create catalog - props := iceberg.Properties{ - hive.URI: "thrift://localhost:9083", - hive.Warehouse: "/tmp/iceberg-warehouse", - } - - cat, err := hive.NewCatalog(props) - if err != nil { - log.Fatal(err) - } - defer cat.Close() - - // List namespaces - namespaces, err := cat.ListNamespaces(ctx, nil) - if err != nil { - log.Fatal(err) - } - - fmt.Println("Namespaces:") - for _, ns := range namespaces { - fmt.Printf(" - %v\n", ns) - } - - // Create a namespace - err = cat.CreateNamespace(ctx, hive.DatabaseIdentifier("my_db"), iceberg.Properties{ - "location": "/tmp/iceberg-warehouse/my_db", - "comment": "My test database", - }) - if err != nil { - log.Fatal(err) - } - - // Create a table - schema := iceberg.NewSchemaWithIdentifiers(0, []int{1}, - iceberg.NestedField{ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, - iceberg.NestedField{ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, - ) - - tbl, err := cat.CreateTable(ctx, hive.TableIdentifier("my_db", "my_table"), schema) - if err != nil { - log.Fatal(err) - } - - fmt.Printf("Created table: %s\n", tbl.Identifier()) -} -``` - -## Configuration Properties - -| Property | Description | Default | -|----------|-------------|---------| -| `uri` | Thrift URI for Hive Metastore (e.g., `thrift://localhost:9083`) | Required | -| `warehouse` | Default warehouse location for tables | - | -| `hive.kerberos-authentication` | Enable Kerberos authentication | `false` | diff --git a/catalog/hive/client.go b/catalog/hive/client.go index 10e4d0a10..86e990dae 100644 --- a/catalog/hive/client.go +++ b/catalog/hive/client.go @@ -27,8 +27,6 @@ import ( "github.com/beltran/gohive/hive_metastore" ) -// HiveClient interface for Hive Metastore operations. -// This allows for mocking in tests. type HiveClient interface { Close() @@ -43,14 +41,16 @@ type HiveClient interface { AlterTable(ctx context.Context, dbName, tableName string, newTbl *hive_metastore.Table) error DropTable(ctx context.Context, dbName, tableName string, deleteData bool) error GetTables(ctx context.Context, dbName, pattern string) ([]string, error) + + Lock(ctx context.Context, request *hive_metastore.LockRequest) (*hive_metastore.LockResponse, error) + CheckLock(ctx context.Context, lockId int64) (*hive_metastore.LockResponse, error) + Unlock(ctx context.Context, lockId int64) error } -// thriftClient wraps the gohive HiveMetastoreClient. type thriftClient struct { client *gohive.HiveMetastoreClient } -// NewHiveClient creates a new Hive Metastore client using gohive. func NewHiveClient(uri string, opts *HiveOptions) (HiveClient, error) { parsed, err := url.Parse(uri) if err != nil { @@ -129,3 +129,19 @@ func (c *thriftClient) DropTable(ctx context.Context, dbName, tableName string, func (c *thriftClient) GetTables(ctx context.Context, dbName, pattern string) ([]string, error) { return c.client.Client.GetTables(ctx, dbName, pattern) } + +func (c *thriftClient) Lock(ctx context.Context, request *hive_metastore.LockRequest) (*hive_metastore.LockResponse, error) { + return c.client.Client.Lock(ctx, request) +} + +func (c *thriftClient) CheckLock(ctx context.Context, lockId int64) (*hive_metastore.LockResponse, error) { + return c.client.Client.CheckLock(ctx, &hive_metastore.CheckLockRequest{ + Lockid: lockId, + }) +} + +func (c *thriftClient) Unlock(ctx context.Context, lockId int64) error { + return c.client.Client.Unlock(ctx, &hive_metastore.UnlockRequest{ + Lockid: lockId, + }) +} diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go index c2bdee814..d5951af56 100644 --- a/catalog/hive/hive.go +++ b/catalog/hive/hive.go @@ -35,7 +35,6 @@ import ( ) const ( - // Property keys for namespace properties locationKey = "location" commentKey = "comment" descriptionKey = "description" @@ -49,13 +48,11 @@ func init() { })) } -// Catalog implements the catalog.Catalog interface for Hive Metastore. type Catalog struct { client HiveClient opts *HiveOptions } -// NewCatalog creates a new Hive Metastore catalog. func NewCatalog(props iceberg.Properties, opts ...Option) (*Catalog, error) { o := NewHiveOptions() o.ApplyProperties(props) @@ -79,8 +76,6 @@ func NewCatalog(props iceberg.Properties, opts ...Option) (*Catalog, error) { }, nil } -// NewCatalogWithClient creates a new Hive Metastore catalog with a custom client. -// This is useful for testing with mock clients. func NewCatalogWithClient(client HiveClient, props iceberg.Properties) *Catalog { o := NewHiveOptions() o.ApplyProperties(props) @@ -91,12 +86,10 @@ func NewCatalogWithClient(client HiveClient, props iceberg.Properties) *Catalog } } -// CatalogType returns the type of the catalog. func (c *Catalog) CatalogType() catalog.Type { return catalog.Hive } -// Close closes the connection to the Hive Metastore. func (c *Catalog) Close() error { c.client.Close() @@ -113,7 +106,6 @@ func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) it return } - // Get all table names in the database tableNames, err := c.client.GetTables(ctx, database, "*") if err != nil { yield(nil, fmt.Errorf("failed to list tables in %s: %w", database, err)) @@ -125,11 +117,9 @@ func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) it return } - // Check each table to see if it's an Iceberg table for _, tableName := range tableNames { tbl, err := c.client.GetTable(ctx, database, tableName) if err != nil { - // Skip tables we can't read continue } if isIcebergTable(tbl) { @@ -141,7 +131,6 @@ func (c *Catalog) ListTables(ctx context.Context, namespace table.Identifier) it } } -// LoadTable loads a table from the catalog. func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) { database, tableName, err := identifierToTableName(identifier) if err != nil { @@ -167,7 +156,6 @@ func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier) (* ) } -// CreateTable creates a new table in the catalog. func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) (*table.Table, error) { staged, err := internal.CreateStagedTable(ctx, c.opts.props, c.LoadNamespaceProperties, identifier, schema, opts...) if err != nil { @@ -179,7 +167,6 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, return nil, err } - // Get the filesystem for writing metadata afs, err := staged.FS(ctx) if err != nil { return nil, err @@ -189,13 +176,11 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, return nil, errors.New("loaded filesystem IO does not support writing") } - // Write the metadata file compression := staged.Table.Properties().Get(table.MetadataCompressionKey, table.MetadataCompressionDefault) if err := internal.WriteTableMetadata(staged.Metadata(), wfs, staged.MetadataLocation(), compression); err != nil { return nil, err } - // Create the Hive table hiveTbl := constructHiveTable(database, tableName, staged.Table.Location(), staged.MetadataLocation(), schema, staged.Table.Properties()) if err := c.client.CreateTable(ctx, hiveTbl); err != nil { @@ -209,19 +194,16 @@ func (c *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, return c.LoadTable(ctx, identifier) } -// DropTable drops a table from the catalog. func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error { database, tableName, err := identifierToTableName(identifier) if err != nil { return err } - // Verify it's an Iceberg table if _, err := c.getIcebergTable(ctx, database, tableName); err != nil { return err } - // Drop the table (deleteData=false for external tables) if err := c.client.DropTable(ctx, database, tableName, false); err != nil { return fmt.Errorf("failed to drop table %s.%s: %w", database, tableName, err) } @@ -229,7 +211,6 @@ func (c *Catalog) DropTable(ctx context.Context, identifier table.Identifier) er return nil } -// RenameTable renames a table in the catalog. func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) { fromDB, fromTable, err := identifierToTableName(from) if err != nil { @@ -241,7 +222,6 @@ func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (* return nil, err } - // Check that target namespace exists exists, err := c.CheckNamespaceExists(ctx, DatabaseIdentifier(toDB)) if err != nil { return nil, err @@ -250,17 +230,14 @@ func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (* return nil, fmt.Errorf("%w: %s", catalog.ErrNoSuchNamespace, toDB) } - // Get the existing table hiveTbl, err := c.getIcebergTable(ctx, fromDB, fromTable) if err != nil { return nil, err } - // Update table name and database hiveTbl.TableName = toTable hiveTbl.DbName = toDB - // Alter the table to rename it if err := c.client.AlterTable(ctx, fromDB, fromTable, hiveTbl); err != nil { return nil, fmt.Errorf("failed to rename table %s.%s to %s.%s: %w", fromDB, fromTable, toDB, toTable, err) } @@ -268,14 +245,20 @@ func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (* return c.LoadTable(ctx, to) } -// CommitTable commits updates to a table. func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) { database, tableName, err := identifierToTableName(identifier) if err != nil { return nil, "", err } - // Load current table state + lock, err := acquireLock(ctx, c.client, database, tableName, c.opts) + if err != nil { + return nil, "", fmt.Errorf("failed to acquire lock for %s.%s: %w", database, tableName, err) + } + defer func() { + _ = lock.Release(ctx) + }() + currentHiveTbl, err := c.client.GetTable(ctx, database, tableName) if err != nil && !isNoSuchObjectError(err) { return nil, "", err @@ -293,23 +276,19 @@ func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, } } - // Create staged table with updates staged, err := internal.UpdateAndStageTable(ctx, current, identifier, requirements, updates, c) if err != nil { return nil, "", err } - // Check if there are actual changes if current != nil && staged.Metadata().Equals(current.Metadata()) { return current.Metadata(), current.MetadataLocation(), nil } - // Write new metadata if err := internal.WriteMetadata(ctx, staged.Metadata(), staged.MetadataLocation(), staged.Properties()); err != nil { return nil, "", err } - // Update Hive table if current != nil { updatedHiveTbl := updateHiveTableForCommit(currentHiveTbl, staged.MetadataLocation()) @@ -317,7 +296,6 @@ func (c *Catalog) CommitTable(ctx context.Context, identifier table.Identifier, return nil, "", fmt.Errorf("failed to commit table %s.%s: %w", database, tableName, err) } } else { - // Create new table hiveTbl := constructHiveTable(database, tableName, staged.Table.Location(), staged.MetadataLocation(), staged.Metadata().CurrentSchema(), staged.Properties()) if err := c.client.CreateTable(ctx, hiveTbl); err != nil { return nil, "", fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) @@ -346,11 +324,7 @@ func (c *Catalog) CheckTableExists(ctx context.Context, identifier table.Identif return isIcebergTable(hiveTbl), nil } -// Namespace operations - -// ListNamespaces returns a list of namespaces in the catalog. func (c *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) { - // Hive doesn't support hierarchical namespaces if len(parent) > 0 { return nil, errors.New("hierarchical namespace is not supported") } @@ -409,7 +383,6 @@ func (c *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) return err } - // Check if namespace exists _, err = c.client.GetDatabase(ctx, database) if err != nil { if isNoSuchObjectError(err) { @@ -419,7 +392,6 @@ func (c *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) return err } - // Drop database (cascade=false to fail if not empty) if err := c.client.DropDatabase(ctx, database, false, false); err != nil { if isInvalidOperationError(err) { return fmt.Errorf("%w: %s", catalog.ErrNamespaceNotEmpty, database) @@ -480,8 +452,6 @@ func (c *Catalog) LoadNamespaceProperties(ctx context.Context, namespace table.I return props, nil } -// avoid circular dependency -// //go:linkname getUpdatedPropsAndUpdateSummary github.com/apache/iceberg-go/catalog.getUpdatedPropsAndUpdateSummary func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals []string, updates iceberg.Properties) (iceberg.Properties, catalog.PropertiesUpdateSummary, error) @@ -568,8 +538,6 @@ func DatabaseIdentifier(database string) table.Identifier { return []string{database} } -// Error checking helpers for Hive Metastore exceptions - func isNoSuchObjectError(err error) bool { if err == nil { return false diff --git a/catalog/hive/hive_test.go b/catalog/hive/hive_test.go index 9d107e367..3277a4580 100644 --- a/catalog/hive/hive_test.go +++ b/catalog/hive/hive_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" ) -// mockHiveClient is a mock implementation of HiveClient for testing. type mockHiveClient struct { mock.Mock } @@ -101,6 +100,30 @@ func (m *mockHiveClient) AlterTable(ctx context.Context, dbName, tableName strin return args.Error(0) } +func (m *mockHiveClient) Lock(ctx context.Context, request *hive_metastore.LockRequest) (*hive_metastore.LockResponse, error) { + args := m.Called(ctx, request) + if args.Get(0) == nil { + return nil, args.Error(1) + } + + return args.Get(0).(*hive_metastore.LockResponse), args.Error(1) +} + +func (m *mockHiveClient) CheckLock(ctx context.Context, lockId int64) (*hive_metastore.LockResponse, error) { + args := m.Called(ctx, lockId) + if args.Get(0) == nil { + return nil, args.Error(1) + } + + return args.Get(0).(*hive_metastore.LockResponse), args.Error(1) +} + +func (m *mockHiveClient) Unlock(ctx context.Context, lockId int64) error { + args := m.Called(ctx, lockId) + + return args.Error(0) +} + func (m *mockHiveClient) Close() { m.Called() } diff --git a/catalog/hive/options.go b/catalog/hive/options.go index ec55dc86b..a457fb047 100644 --- a/catalog/hive/options.go +++ b/catalog/hive/options.go @@ -18,10 +18,12 @@ package hive import ( + "strconv" + "time" + "github.com/apache/iceberg-go" ) -// Configuration property keys for the Hive catalog. const ( // URI is the Thrift URI for the Hive Metastore (e.g., "thrift://localhost:9083") URI = "uri" @@ -38,24 +40,39 @@ const ( MetadataLocationKey = "metadata_location" PreviousMetadataLocationKey = "previous_metadata_location" ExternalKey = "EXTERNAL" + + // Lock configuration property keys + LockCheckMinWaitTime = "lock-check-min-wait-time" + LockCheckMaxWaitTime = "lock-check-max-wait-time" + LockCheckRetries = "lock-check-retries" + + // Default lock configuration values + DefaultLockCheckMinWaitTime = 100 * time.Millisecond // 100ms + DefaultLockCheckMaxWaitTime = 60 * time.Second // 1 minute + DefaultLockCheckRetries = 4 ) -// HiveOptions contains configuration options for the Hive Metastore catalog. type HiveOptions struct { URI string Warehouse string KerberosAuth bool props iceberg.Properties + + // Lock configuration for atomic commits + LockMinWaitTime time.Duration + LockMaxWaitTime time.Duration + LockRetries int } -// NewHiveOptions creates a new HiveOptions with default values. func NewHiveOptions() *HiveOptions { return &HiveOptions{ - props: iceberg.Properties{}, + props: iceberg.Properties{}, + LockMinWaitTime: DefaultLockCheckMinWaitTime, + LockMaxWaitTime: DefaultLockCheckMaxWaitTime, + LockRetries: DefaultLockCheckRetries, } } -// ApplyProperties applies properties from an iceberg.Properties map. func (o *HiveOptions) ApplyProperties(props iceberg.Properties) { o.props = props @@ -68,9 +85,25 @@ func (o *HiveOptions) ApplyProperties(props iceberg.Properties) { if props.GetBool(KerberosAuth, false) { o.KerberosAuth = true } + + // Parse lock configuration + if val, ok := props[LockCheckMinWaitTime]; ok { + if d, err := time.ParseDuration(val); err == nil { + o.LockMinWaitTime = d + } + } + if val, ok := props[LockCheckMaxWaitTime]; ok { + if d, err := time.ParseDuration(val); err == nil { + o.LockMaxWaitTime = d + } + } + if val, ok := props[LockCheckRetries]; ok { + if i, err := strconv.Atoi(val); err == nil { + o.LockRetries = i + } + } } -// Option is a functional option for configuring the Hive catalog. type Option func(*HiveOptions) // WithURI sets the Thrift URI for the Hive Metastore. @@ -80,21 +113,18 @@ func WithURI(uri string) Option { } } -// WithWarehouse sets the default warehouse location. func WithWarehouse(warehouse string) Option { return func(o *HiveOptions) { o.Warehouse = warehouse } } -// WithKerberosAuth enables Kerberos authentication. func WithKerberosAuth(enabled bool) Option { return func(o *HiveOptions) { o.KerberosAuth = enabled } } -// WithProperties sets additional properties for the catalog. func WithProperties(props iceberg.Properties) Option { return func(o *HiveOptions) { o.props = props diff --git a/catalog/hive/schema.go b/catalog/hive/schema.go index ea1891d4f..f0b3880df 100644 --- a/catalog/hive/schema.go +++ b/catalog/hive/schema.go @@ -26,7 +26,6 @@ import ( "github.com/beltran/gohive/hive_metastore" ) -// schemaToHiveColumns converts an Iceberg schema to Hive FieldSchema columns. func schemaToHiveColumns(schema *iceberg.Schema) []*hive_metastore.FieldSchema { columns := make([]*hive_metastore.FieldSchema, 0, len(schema.Fields())) for _, field := range schema.Fields() { @@ -36,7 +35,6 @@ func schemaToHiveColumns(schema *iceberg.Schema) []*hive_metastore.FieldSchema { return columns } -// fieldToHiveColumn converts an Iceberg NestedField to a Hive FieldSchema. func fieldToHiveColumn(field iceberg.NestedField) *hive_metastore.FieldSchema { return &hive_metastore.FieldSchema{ Name: field.Name, @@ -103,7 +101,6 @@ func icebergTypeToHiveType(typ iceberg.Type) string { } } -// constructHiveTable creates a Hive Table struct for an Iceberg table. func constructHiveTable(dbName, tableName, location, metadataLocation string, schema *iceberg.Schema, props map[string]string) *hive_metastore.Table { parameters := make(map[string]string) @@ -137,7 +134,6 @@ func constructHiveTable(dbName, tableName, location, metadataLocation string, sc } } -// updateHiveTableForCommit updates a Hive Table struct for a commit operation. func updateHiveTableForCommit(existing *hive_metastore.Table, newMetadataLocation string) *hive_metastore.Table { // Copy the existing table updated := *existing @@ -172,7 +168,6 @@ func isIcebergTable(tbl *hive_metastore.Table) bool { return strings.EqualFold(tableType, TableTypeIceberg) } -// getMetadataLocation returns the metadata location from a Hive table. func getMetadataLocation(tbl *hive_metastore.Table) (string, error) { if tbl == nil || tbl.Parameters == nil { return "", errors.New("table has no parameters") From 51701f1b641ccfd1514724c0272e3e436c7e888b Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Thu, 22 Jan 2026 18:13:06 -0600 Subject: [PATCH 7/7] fixed linter errors --- catalog/hive/lock.go | 127 ++++++++++++++++ catalog/hive/lock_test.go | 302 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 429 insertions(+) create mode 100644 catalog/hive/lock.go create mode 100644 catalog/hive/lock_test.go diff --git a/catalog/hive/lock.go b/catalog/hive/lock.go new file mode 100644 index 000000000..e3c6c24b6 --- /dev/null +++ b/catalog/hive/lock.go @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hive + +import ( + "context" + "errors" + "fmt" + "math" + "time" + + "github.com/beltran/gohive/hive_metastore" +) + +// ErrLockAcquisitionFailed is returned when a lock cannot be acquired after all retries. +var ErrLockAcquisitionFailed = errors.New("failed to acquire lock") + +type HiveLock struct { + client HiveClient + lockId int64 +} + +func acquireLock(ctx context.Context, client HiveClient, database, tableName string, opts *HiveOptions) (*HiveLock, error) { + lockReq := &hive_metastore.LockRequest{ + Component: []*hive_metastore.LockComponent{{ + Type: hive_metastore.LockType_EXCLUSIVE, + Level: hive_metastore.LockLevel_TABLE, + Dbname: database, + Tablename: &tableName, + }}, + } + + lockResp, err := client.Lock(ctx, lockReq) + if err != nil { + return nil, fmt.Errorf("failed to request lock: %w", err) + } + + if lockResp.State == hive_metastore.LockState_ACQUIRED { + return &HiveLock{ + client: client, + lockId: lockResp.Lockid, + }, nil + } + + // If not acquired immediately, wait and retry + for attempt := 0; attempt < opts.LockRetries; attempt++ { + // Wait before checking again + waitTime := calculateBackoff(attempt, opts.LockMinWaitTime, opts.LockMaxWaitTime) + + select { + case <-ctx.Done(): + _ = client.Unlock(ctx, lockResp.Lockid) + + return nil, ctx.Err() + case <-time.After(waitTime): + } + + // Check lock state + checkResp, err := client.CheckLock(ctx, lockResp.Lockid) + if err != nil { + _ = client.Unlock(ctx, lockResp.Lockid) + + return nil, fmt.Errorf("failed to check lock status: %w", err) + } + + switch checkResp.State { + case hive_metastore.LockState_ACQUIRED: + return &HiveLock{ + client: client, + lockId: lockResp.Lockid, + }, nil + case hive_metastore.LockState_WAITING: + // Continue waiting + continue + case hive_metastore.LockState_ABORT: + return nil, fmt.Errorf("%w: lock was aborted", ErrLockAcquisitionFailed) + case hive_metastore.LockState_NOT_ACQUIRED: + return nil, fmt.Errorf("%w: lock not acquired", ErrLockAcquisitionFailed) + default: + return nil, fmt.Errorf("%w: unexpected lock state: %v", ErrLockAcquisitionFailed, checkResp.State) + } + } + + _ = client.Unlock(ctx, lockResp.Lockid) + + return nil, fmt.Errorf("%w: exhausted %d retries for table %s.%s", ErrLockAcquisitionFailed, opts.LockRetries, database, tableName) +} + +func calculateBackoff(attempt int, minWait, maxWait time.Duration) time.Duration { + wait := time.Duration(float64(minWait) * math.Pow(2, float64(attempt))) + if wait > maxWait { + wait = maxWait + } + + return wait +} + +func (l *HiveLock) Release(ctx context.Context) error { + if l == nil { + return nil + } + + return l.client.Unlock(ctx, l.lockId) +} + +func (l *HiveLock) LockID() int64 { + if l == nil { + return 0 + } + + return l.lockId +} diff --git a/catalog/hive/lock_test.go b/catalog/hive/lock_test.go new file mode 100644 index 000000000..6272f5c9e --- /dev/null +++ b/catalog/hive/lock_test.go @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hive + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/beltran/gohive/hive_metastore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestAcquireLockImmediateSuccess(t *testing.T) { + mockClient := new(mockHiveClient) + ctx := context.Background() + opts := NewHiveOptions() + + mockClient.On("Lock", ctx, mock.AnythingOfType("*hive_metastore.LockRequest")). + Return(&hive_metastore.LockResponse{ + Lockid: 123, + State: hive_metastore.LockState_ACQUIRED, + }, nil) + + lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts) + + require.NoError(t, err) + require.NotNil(t, lock) + assert.Equal(t, int64(123), lock.LockID()) + + mockClient.AssertExpectations(t) +} + +func TestAcquireLockWithRetry(t *testing.T) { + mockClient := new(mockHiveClient) + ctx := context.Background() + opts := NewHiveOptions() + opts.LockMinWaitTime = 1 * time.Millisecond // Fast retries for testing + opts.LockMaxWaitTime = 10 * time.Millisecond + opts.LockRetries = 3 + + // Lock request returns WAITING initially + mockClient.On("Lock", ctx, mock.AnythingOfType("*hive_metastore.LockRequest")). + Return(&hive_metastore.LockResponse{ + Lockid: 456, + State: hive_metastore.LockState_WAITING, + }, nil) + + // First CheckLock returns WAITING + mockClient.On("CheckLock", ctx, int64(456)). + Return(&hive_metastore.LockResponse{ + Lockid: 456, + State: hive_metastore.LockState_WAITING, + }, nil).Once() + + // Second CheckLock returns ACQUIRED + mockClient.On("CheckLock", ctx, int64(456)). + Return(&hive_metastore.LockResponse{ + Lockid: 456, + State: hive_metastore.LockState_ACQUIRED, + }, nil).Once() + + lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts) + + require.NoError(t, err) + require.NotNil(t, lock) + assert.Equal(t, int64(456), lock.LockID()) + + mockClient.AssertExpectations(t) +} + +func TestAcquireLockExhaustsRetries(t *testing.T) { + mockClient := new(mockHiveClient) + ctx := context.Background() + opts := NewHiveOptions() + opts.LockMinWaitTime = 1 * time.Millisecond + opts.LockMaxWaitTime = 10 * time.Millisecond + opts.LockRetries = 2 + + // Lock request returns WAITING + mockClient.On("Lock", ctx, mock.AnythingOfType("*hive_metastore.LockRequest")). + Return(&hive_metastore.LockResponse{ + Lockid: 789, + State: hive_metastore.LockState_WAITING, + }, nil) + + // All CheckLock calls return WAITING + mockClient.On("CheckLock", ctx, int64(789)). + Return(&hive_metastore.LockResponse{ + Lockid: 789, + State: hive_metastore.LockState_WAITING, + }, nil) + + mockClient.On("Unlock", ctx, int64(789)).Return(nil) + + lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts) + + require.Error(t, err) + require.Nil(t, lock) + assert.ErrorIs(t, err, ErrLockAcquisitionFailed) + assert.Contains(t, err.Error(), "exhausted 2 retries") + + mockClient.AssertExpectations(t) +} + +func TestAcquireLockAborted(t *testing.T) { + mockClient := new(mockHiveClient) + ctx := context.Background() + opts := NewHiveOptions() + opts.LockMinWaitTime = 1 * time.Millisecond + + // Lock request returns WAITING + mockClient.On("Lock", ctx, mock.AnythingOfType("*hive_metastore.LockRequest")). + Return(&hive_metastore.LockResponse{ + Lockid: 111, + State: hive_metastore.LockState_WAITING, + }, nil) + + // CheckLock returns ABORT + mockClient.On("CheckLock", ctx, int64(111)). + Return(&hive_metastore.LockResponse{ + Lockid: 111, + State: hive_metastore.LockState_ABORT, + }, nil) + + lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts) + + require.Error(t, err) + require.Nil(t, lock) + assert.ErrorIs(t, err, ErrLockAcquisitionFailed) + assert.Contains(t, err.Error(), "aborted") + + mockClient.AssertExpectations(t) +} + +func TestAcquireLockRequestFails(t *testing.T) { + mockClient := new(mockHiveClient) + ctx := context.Background() + opts := NewHiveOptions() + + // Lock request fails + mockClient.On("Lock", ctx, mock.AnythingOfType("*hive_metastore.LockRequest")). + Return(nil, errors.New("connection failed")) + + lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts) + + require.Error(t, err) + require.Nil(t, lock) + assert.Contains(t, err.Error(), "failed to request lock") + + mockClient.AssertExpectations(t) +} + +func TestAcquireLockCheckFails(t *testing.T) { + mockClient := new(mockHiveClient) + ctx := context.Background() + opts := NewHiveOptions() + opts.LockMinWaitTime = 1 * time.Millisecond + + // Lock request returns WAITING + mockClient.On("Lock", ctx, mock.AnythingOfType("*hive_metastore.LockRequest")). + Return(&hive_metastore.LockResponse{ + Lockid: 222, + State: hive_metastore.LockState_WAITING, + }, nil) + + // CheckLock fails + mockClient.On("CheckLock", ctx, int64(222)). + Return(nil, errors.New("check failed")) + + // Lock should be released on error + mockClient.On("Unlock", ctx, int64(222)).Return(nil) + + lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts) + + require.Error(t, err) + require.Nil(t, lock) + assert.Contains(t, err.Error(), "failed to check lock status") + + mockClient.AssertExpectations(t) +} + +func TestAcquireLockContextCancelled(t *testing.T) { + mockClient := new(mockHiveClient) + ctx, cancel := context.WithCancel(context.Background()) + opts := NewHiveOptions() + opts.LockMinWaitTime = 100 * time.Millisecond // Longer wait so we can cancel + + // Lock request returns WAITING + mockClient.On("Lock", ctx, mock.AnythingOfType("*hive_metastore.LockRequest")). + Return(&hive_metastore.LockResponse{ + Lockid: 333, + State: hive_metastore.LockState_WAITING, + }, nil) + + // Lock should be released when context is cancelled + mockClient.On("Unlock", ctx, int64(333)).Return(nil) + + // Cancel context before the wait completes + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + + lock, err := acquireLock(ctx, mockClient, "testdb", "testtable", opts) + + require.Error(t, err) + require.Nil(t, lock) + assert.ErrorIs(t, err, context.Canceled) + + mockClient.AssertExpectations(t) +} + +func TestReleaseLock(t *testing.T) { + mockClient := new(mockHiveClient) + ctx := context.Background() + + lock := &HiveLock{ + client: mockClient, + lockId: 999, + } + + mockClient.On("Unlock", ctx, int64(999)).Return(nil) + + err := lock.Release(ctx) + + require.NoError(t, err) + mockClient.AssertExpectations(t) +} + +func TestReleaseLockNil(t *testing.T) { + var lock *HiveLock = nil + + err := lock.Release(context.Background()) + + require.NoError(t, err) +} + +func TestCalculateBackoff(t *testing.T) { + minWait := 100 * time.Millisecond + maxWait := 1 * time.Second + + tests := []struct { + attempt int + expected time.Duration + }{ + {0, 100 * time.Millisecond}, // 100ms * 2^0 = 100ms + {1, 200 * time.Millisecond}, // 100ms * 2^1 = 200ms + {2, 400 * time.Millisecond}, // 100ms * 2^2 = 400ms + {3, 800 * time.Millisecond}, // 100ms * 2^3 = 800ms + {4, 1 * time.Second}, // 100ms * 2^4 = 1.6s, capped at 1s + {10, 1 * time.Second}, // Capped at maxWait + } + + for _, tt := range tests { + t.Run("", func(t *testing.T) { + result := calculateBackoff(tt.attempt, minWait, maxWait) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestLockConfigurationParsing(t *testing.T) { + props := map[string]string{ + LockCheckMinWaitTime: "200ms", + LockCheckMaxWaitTime: "30s", + LockCheckRetries: "5", + } + + opts := NewHiveOptions() + opts.ApplyProperties(props) + + assert.Equal(t, 200*time.Millisecond, opts.LockMinWaitTime) + assert.Equal(t, 30*time.Second, opts.LockMaxWaitTime) + assert.Equal(t, 5, opts.LockRetries) +} + +func TestLockConfigurationDefaults(t *testing.T) { + opts := NewHiveOptions() + + assert.Equal(t, DefaultLockCheckMinWaitTime, opts.LockMinWaitTime) + assert.Equal(t, DefaultLockCheckMaxWaitTime, opts.LockMaxWaitTime) + assert.Equal(t, DefaultLockCheckRetries, opts.LockRetries) +}