diff --git a/table/metadata.go b/table/metadata.go index 794c58f8b..871b56cb0 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -1155,6 +1155,7 @@ func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int { var ( ErrInvalidMetadataFormatVersion = errors.New("invalid or missing format-version in table metadata") ErrInvalidMetadata = errors.New("invalid metadata") + ErrPartitionSpecNotFound = errors.New("partition spec not found") ) // ParseMetadata parses json metadata provided by the passed in reader, diff --git a/table/scanner.go b/table/scanner.go index ee12bf9ef..67a534cfe 100644 --- a/table/scanner.go +++ b/table/scanner.go @@ -242,21 +242,30 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) { } func (scan *Scan) buildPartitionProjection(specID int) (iceberg.BooleanExpression, error) { - project := newInclusiveProjection(scan.metadata.CurrentSchema(), - scan.metadata.PartitionSpecs()[specID], true) + spec, err := getPartitionSpecByID(scan.metadata.PartitionSpecs(), specID) + if err != nil { + return nil, err + } + project := newInclusiveProjection(scan.metadata.CurrentSchema(), *spec, true) return project(scan.rowFilter) } func (scan *Scan) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile) (bool, error), error) { - spec := scan.metadata.PartitionSpecs()[specID] + spec, err := getPartitionSpecByID(scan.metadata.PartitionSpecs(), specID) + if err != nil { + return nil, err + } - return newManifestEvaluator(spec, scan.metadata.CurrentSchema(), + return newManifestEvaluator(*spec, scan.metadata.CurrentSchema(), scan.partitionFilters.Get(specID), scan.caseSensitive) } -func (scan *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile) (bool, error) { - spec := scan.metadata.PartitionSpecs()[specID] +func (scan *Scan) buildPartitionEvaluator(specID int) (func(iceberg.DataFile) (bool, error), error) { + spec, err := getPartitionSpecByID(scan.metadata.PartitionSpecs(), specID) + if err != nil { + return nil, err + } partType := spec.PartitionType(scan.metadata.CurrentSchema()) partSchema := iceberg.NewSchema(0, partType.FieldList...) partExpr := scan.partitionFilters.Get(specID) @@ -268,7 +277,7 @@ func (scan *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile) (bo } return fn(getPartitionRecord(d, partType)) - } + }, nil } func (scan *Scan) checkSequenceNumber(minSeqNum int64, manifest iceberg.ManifestFile) bool { @@ -370,7 +379,7 @@ func (scan *Scan) collectManifestEntries( g, _ := errgroup.WithContext(ctx) g.SetLimit(concurrencyLimit) - partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator) + partitionEvaluators := newKeyDefaultMapWrapErr(scan.buildPartitionEvaluator) for _, mf := range manifestList { if !scan.checkSequenceNumber(minSeqNum, mf) { diff --git a/table/scanner_internal_test.go b/table/scanner_internal_test.go index af08b687e..07ca97000 100644 --- a/table/scanner_internal_test.go +++ b/table/scanner_internal_test.go @@ -25,6 +25,7 @@ import ( "github.com/apache/iceberg-go" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func newDataManifest(minSeqNum int64) iceberg.ManifestFile { @@ -135,3 +136,98 @@ func TestKeyDefaultMapRaceCondition(t *testing.T) { assert.Equal(t, int64(1), callCount, "factory should be called exactly once per key, but was called %d times", callCount) } + +func TestBuildPartitionProjectionWithInvalidSpecID(t *testing.T) { + schema := iceberg.NewSchema( + 1, + iceberg.NestedField{ + ID: 1, Name: "id", + Type: iceberg.PrimitiveTypes.Int64, Required: true, + }, + ) + + metadata, err := NewMetadata( + schema, + iceberg.UnpartitionedSpec, + UnsortedSortOrder, + "s3://test-bucket/test_table", + iceberg.Properties{}, + ) + require.NoError(t, err) + + scan := &Scan{ + metadata: metadata, + rowFilter: iceberg.AlwaysTrue{}, + caseSensitive: true, + } + + expr, err := scan.buildPartitionProjection(999) + require.Error(t, err) + assert.Nil(t, expr) + assert.ErrorIs(t, err, ErrPartitionSpecNotFound) + assert.ErrorContains(t, err, "id 999") +} + +func TestBuildManifestEvaluatorWithInvalidSpecID(t *testing.T) { + schema := iceberg.NewSchema( + 1, + iceberg.NestedField{ + ID: 1, Name: "id", + Type: iceberg.PrimitiveTypes.Int64, Required: true, + }, + ) + + metadata, err := NewMetadata( + schema, + iceberg.UnpartitionedSpec, + UnsortedSortOrder, + "s3://test-bucket/test_table", + iceberg.Properties{}, + ) + require.NoError(t, err) + + scan := &Scan{ + metadata: metadata, + rowFilter: iceberg.AlwaysTrue{}, + caseSensitive: true, + } + + scan.partitionFilters = newKeyDefaultMapWrapErr(scan.buildPartitionProjection) + + evaluator, err := scan.buildManifestEvaluator(999) + require.Error(t, err) + assert.Nil(t, evaluator) + assert.ErrorIs(t, err, ErrPartitionSpecNotFound) + assert.ErrorContains(t, err, "id 999") +} + +func TestBuildPartitionEvaluatorWithInvalidSpecID(t *testing.T) { + schema := iceberg.NewSchema( + 1, + iceberg.NestedField{ + ID: 1, Name: "id", + Type: iceberg.PrimitiveTypes.Int64, Required: true, + }, + ) + + metadata, err := NewMetadata( + schema, + iceberg.UnpartitionedSpec, + UnsortedSortOrder, + "s3://test-bucket/test_table", + iceberg.Properties{}, + ) + require.NoError(t, err) + + scan := &Scan{ + metadata: metadata, + rowFilter: iceberg.AlwaysTrue{}, + caseSensitive: true, + } + + evaluator, err := scan.buildPartitionEvaluator(999) + require.Error(t, err) + assert.Nil(t, evaluator) + assert.ErrorIs(t, err, ErrPartitionSpecNotFound) + assert.ErrorContains(t, err, "id 999") +} diff --git a/table/utils.go b/table/utils.go new file mode 100644 index 000000000..0a36f4134 --- /dev/null +++ b/table/utils.go @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "fmt" + "slices" + + "github.com/apache/iceberg-go" +) + +func getPartitionSpecByID(specs []iceberg.PartitionSpec, specID int) (*iceberg.PartitionSpec, error) { + i := slices.IndexFunc(specs, func(spec iceberg.PartitionSpec) bool { + return spec.ID() == specID + }) + if i < 0 { + return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound, specID) + } + + return &specs[i], nil +}