Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions catalog/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, pr

ctx = utils.WithAwsConfig(ctx, c.awsCfg)
// TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog.
iofs, err := io.LoadFS(ctx, props, location)
iofs, err := io.Load(ctx, props, location)
if err != nil {
return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err)
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func (c *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
}
// Load the metadata file to get table properties
ctx = utils.WithAwsConfig(ctx, c.awsCfg)
iofs, err := io.LoadFS(ctx, nil, metadataLocation)
iofs, err := io.Load(ctx, nil, metadataLocation)
if err != nil {
return nil, fmt.Errorf("failed to load metadata file at %s: %w", metadataLocation, err)
}
Expand Down
6 changes: 3 additions & 3 deletions catalog/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func WriteTableMetadata(metadata table.Metadata, fs io.WriteFileIO, loc string)
}

func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string, props iceberg.Properties) error {
fs, err := io.LoadFS(ctx, props, loc)
fs, err := io.Load(ctx, props, loc)
if err != nil {
return err
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func CreateStagedTable(ctx context.Context, catprops iceberg.Properties, nsprops

ioProps := maps.Clone(catprops)
maps.Copy(ioProps, cfg.Properties)
fs, err := io.LoadFS(ctx, ioProps, metadataLoc)
fs, err := io.Load(ctx, ioProps, metadataLoc)
if err != nil {
return table.StagedTable{}, err
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func UpdateAndStageTable(ctx context.Context, current *table.Table, ident table.
return nil, err
}

fs, err := io.LoadFS(ctx, updated.Properties(), newLocation)
fs, err := io.Load(ctx, updated.Properties(), newLocation)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion catalog/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func checkValidNamespace(ident table.Identifier) error {
}

func (r *Catalog) tableFromResponse(ctx context.Context, identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
iofs, err := iceio.LoadFS(ctx, config, loc)
iofs, err := iceio.Load(ctx, config, loc)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion catalog/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, pr
tblProps := maps.Clone(c.props)
maps.Copy(props, tblProps)

iofs, err := io.LoadFS(ctx, tblProps, result.MetadataLocation.String)
iofs, err := io.Load(ctx, tblProps, result.MetadataLocation.String)
if err != nil {
return nil, err
}
Expand Down
27 changes: 27 additions & 0 deletions io/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,30 @@

return azureblob.OpenBucket(ctx, client, nil)
}

func init() {
azureRegistrar := RegistrarFunc(func(ctx context.Context, props map[string]string) (IO, error) {
// We need a warehouse location to extract the bucket name
location := props["warehouse"]
if location == "" {
return nil, fmt.Errorf("warehouse location required for Azure IO")

Check failure on line 120 in io/azure.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

error-format: fmt.Errorf can be replaced with errors.New (perfsprint)

Check failure on line 120 in io/azure.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

error-format: fmt.Errorf can be replaced with errors.New (perfsprint)

Check failure on line 120 in io/azure.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

error-format: fmt.Errorf can be replaced with errors.New (perfsprint)
}

parsed, err := url.Parse(location)
if err != nil {
return nil, fmt.Errorf("failed to parse Azure location: %w", err)
}

bucket, err := createAzureBucket(ctx, parsed, props)
if err != nil {
return nil, err
}

return createBlobFS(ctx, bucket, parsed.Host), nil
})

Register("abfs", azureRegistrar)
Register("abfss", azureRegistrar)
Register("wasb", azureRegistrar)
Register("wasbs", azureRegistrar)
}
23 changes: 23 additions & 0 deletions io/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import (
"context"
"fmt"
"net/url"

"gocloud.dev/blob"
Expand Down Expand Up @@ -63,3 +64,25 @@

return bucket, nil
}

func init() {
Register("gs", RegistrarFunc(func(ctx context.Context, props map[string]string) (IO, error) {
// We need a warehouse location to extract the bucket name
location := props["warehouse"]
if location == "" {
return nil, fmt.Errorf("warehouse location required for GCS IO")

Check failure on line 73 in io/gcs.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

error-format: fmt.Errorf can be replaced with errors.New (perfsprint)

Check failure on line 73 in io/gcs.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

error-format: fmt.Errorf can be replaced with errors.New (perfsprint)

Check failure on line 73 in io/gcs.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

error-format: fmt.Errorf can be replaced with errors.New (perfsprint)
}

parsed, err := url.Parse(location)
if err != nil {
return nil, fmt.Errorf("failed to parse GCS location: %w", err)
}

bucket, err := createGCSBucket(ctx, parsed, props)
if err != nil {
return nil, err
}

return createBlobFS(ctx, bucket, parsed.Host), nil
}))
}
66 changes: 0 additions & 66 deletions io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,10 @@
package io

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/url"
"strings"

"gocloud.dev/blob"
"gocloud.dev/blob/memblob"
)

// IO is an interface to a hierarchical file system.
Expand Down Expand Up @@ -234,63 +228,3 @@ func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error) {

return d.ReadDir(count)
}

func inferFileIOFromSchema(ctx context.Context, path string, props map[string]string) (IO, error) {
parsed, err := url.Parse(path)
if err != nil {
return nil, err
}
var bucket *blob.Bucket

switch parsed.Scheme {
case "s3", "s3a", "s3n":
bucket, err = createS3Bucket(ctx, parsed, props)
if err != nil {
return nil, err
}
case "gs":
bucket, err = createGCSBucket(ctx, parsed, props)
if err != nil {
return nil, err
}
case "mem":
// memblob doesn't use the URL host or path
bucket = memblob.OpenBucket(nil)
case "file", "":
return LocalFS{}, nil
case "abfs", "abfss", "wasb", "wasbs":
bucket, err = createAzureBucket(ctx, parsed, props)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("IO for file '%s' not implemented", path)
}

return createBlobFS(ctx, bucket, parsed.Host), nil
}

// LoadFS takes a map of properties and an optional URI location
// and attempts to infer an IO object from it.
//
// A schema of "file://" or an empty string will result in a LocalFS
// implementation. Otherwise this will return an error if the schema
// does not yet have an implementation here.
//
// Currently local, S3, GCS, and In-Memory FSs are implemented.
func LoadFS(ctx context.Context, props map[string]string, location string) (IO, error) {
if location == "" {
location = props["warehouse"]
}

iofs, err := inferFileIOFromSchema(ctx, location, props)
if err != nil {
return nil, err
}

if iofs == nil {
iofs = LocalFS{}
}

return iofs, nil
}
10 changes: 10 additions & 0 deletions io/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io

import (
"context"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -47,3 +48,12 @@ func (LocalFS) WriteFile(name string, content []byte) error {
func (LocalFS) Remove(name string) error {
return os.Remove(strings.TrimPrefix(name, "file://"))
}

func init() {
Register("file", RegistrarFunc(func(ctx context.Context, props map[string]string) (IO, error) {
return LocalFS{}, nil
}))
Register("", RegistrarFunc(func(ctx context.Context, props map[string]string) (IO, error) {
return LocalFS{}, nil
}))
}
32 changes: 32 additions & 0 deletions io/mem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package io

import (
"context"

"gocloud.dev/blob/memblob"
)

func init() {
Register("mem", RegistrarFunc(func(ctx context.Context, props map[string]string) (IO, error) {
// memblob doesn't use the URL host or path
bucket := memblob.OpenBucket(nil)
return createBlobFS(ctx, bucket, ""), nil

Check failure on line 30 in io/mem.go

View workflow job for this annotation

GitHub Actions / windows-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 30 in io/mem.go

View workflow job for this annotation

GitHub Actions / macos-latest go1.23.6

return with no blank line before (nlreturn)

Check failure on line 30 in io/mem.go

View workflow job for this annotation

GitHub Actions / ubuntu-latest go1.23.6

return with no blank line before (nlreturn)
}))
}
Loading
Loading