From 119462c4a19d6a1a7a43ad7583e0856a197333bb Mon Sep 17 00:00:00 2001 From: Rener Castro Date: Tue, 16 Dec 2025 01:13:22 -0300 Subject: [PATCH 1/8] feat: add Avro Object Container File (OCF) encoding and decoding with related reader modifications. --- ocf/ocf.go | 86 +++++++++++++++----- ocf/ocf_test.go | 131 ++++++++++++++++++++++++++++++ reader.go | 10 +++ reader_skip.go | 78 ++++++++++++++++++ reader_skip_test.go | 188 ++++++++++++++------------------------------ 5 files changed, 345 insertions(+), 148 deletions(-) diff --git a/ocf/ocf.go b/ocf/ocf.go index 09d7a93..1a7d535 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -59,6 +59,20 @@ type decoderConfig struct { CodecOptions codecOptions } +func newDecoderConfig(opts ...DecoderFunc) *decoderConfig { + cfg := decoderConfig{ + DecoderConfig: avro.DefaultConfig, + SchemaCache: avro.DefaultSchemaCache, + CodecOptions: codecOptions{ + DeflateCompressionLevel: flate.DefaultCompression, + }, + } + for _, opt := range opts { + opt(&cfg) + } + return &cfg +} + // DecoderFunc represents a configuration function for Decoder. type DecoderFunc func(cfg *decoderConfig) @@ -96,23 +110,16 @@ type Decoder struct { codec Codec count int64 + size int64 + n int64 } // NewDecoder returns a new decoder that reads from reader r. func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) { - cfg := decoderConfig{ - DecoderConfig: avro.DefaultConfig, - SchemaCache: avro.DefaultSchemaCache, - CodecOptions: codecOptions{ - DeflateCompressionLevel: flate.DefaultCompression, - }, - } - for _, opt := range opts { - opt(&cfg) - } - reader := avro.NewReader(r, 1024) + cfg := newDecoderConfig(opts...) + h, err := readHeader(reader, cfg.SchemaCache, cfg.CodecOptions) if err != nil { return nil, fmt.Errorf("decoder: %w", err) @@ -131,6 +138,31 @@ func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) { }, nil } +// NewDecoder returns a new decoder that reads from reader r. +func NewDecoderWithHeader(r *avro.Reader, h *OCFHeader, opts ...DecoderFunc) (*Decoder, error) { + cfg := newDecoderConfig(opts...) + decReader := bytesx.NewResetReader([]byte{}) + return &Decoder{ + reader: r, + resetReader: decReader, + decoder: cfg.DecoderConfig.NewDecoder(h.Schema, decReader), + meta: h.Meta, + sync: h.Sync, + codec: h.Codec, + schema: h.Schema, + }, nil +} + +// DecodeHeader reads and decodes the Avro container file header from r. +func DecodeHeader(r *avro.Reader, opts ...DecoderFunc) (*OCFHeader, error) { + cfg := newDecoderConfig(opts...) + h, err := readHeader(r, cfg.SchemaCache, cfg.CodecOptions) + if err != nil { + return nil, fmt.Errorf("decoder: %w", err) + } + return h, nil +} + // Metadata returns the header metadata. func (d *Decoder) Metadata() map[string][]byte { return d.meta @@ -145,8 +177,8 @@ func (d *Decoder) Schema() avro.Schema { // HasNext determines if there is another value to read. func (d *Decoder) HasNext() bool { if d.count <= 0 { - count := d.readBlock() - d.count = count + d.count, d.size = d.readBlock() + d.n = d.count } if d.reader.Error != nil { @@ -184,11 +216,27 @@ func (d *Decoder) Close() error { return nil } -func (d *Decoder) readBlock() int64 { +type BlockStatus struct { + Current int64 + Count int64 + Size int64 + Offset int64 +} + +func (d *Decoder) BlockStatus() *BlockStatus { + return &BlockStatus{ + Current: d.n - d.count + 1, + Count: d.n, + Size: d.size, + Offset: d.reader.InputOffset(), + } +} + +func (d *Decoder) readBlock() (int64, int64) { _ = d.reader.Peek() if errors.Is(d.reader.Error, io.EOF) { // There is no next block - return 0 + return 0, 0 } count := d.reader.ReadLong() @@ -220,7 +268,7 @@ func (d *Decoder) readBlock() int64 { d.reader.Error = errors.New("decoder: invalid block") } - return count + return count, size } type encoderConfig struct { @@ -530,14 +578,14 @@ func (e *Encoder) writerBlock() error { return e.writer.Flush() } -type ocfHeader struct { +type OCFHeader struct { Schema avro.Schema Codec Codec Meta map[string][]byte Sync [16]byte } -func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts codecOptions) (*ocfHeader, error) { +func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts codecOptions) (*OCFHeader, error) { var h Header reader.ReadVal(HeaderSchema, &h) if reader.Error != nil { @@ -557,7 +605,7 @@ func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts co return nil, err } - return &ocfHeader{ + return &OCFHeader{ Schema: schema, Codec: codec, Meta: h.Meta, diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index b22e6dc..8d1baf7 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -6,9 +6,12 @@ import ( "encoding/json" "errors" "flag" + "fmt" "io" "os" + "sort" "strings" + "sync" "testing" "github.com/hamba/avro/v2" @@ -1312,3 +1315,131 @@ type errorHeaderWriter struct{} func (*errorHeaderWriter) Write(p []byte) (int, error) { return 0, errors.New("test") } + +func TestConcurrentDecode(t *testing.T) { + // build an in-memory OCF with many records + unionStr := "union value" + base := FullRecord{ + Strings: []string{"s1", "s2"}, + Longs: []int64{1, 2}, + Enum: "A", + Map: map[string]int{"k": 1}, + Nullable: &unionStr, + Fixed: [16]byte{0x01, 0x02, 0x03}, + Record: &TestRecord{Long: 1, String: "r", Int: 0, Float: 1.1, Double: 2.2, Bool: true}, + } + + const total = 200 + buf := &bytes.Buffer{} + enc, err := ocf.NewEncoder(schema, buf, ocf.WithBlockLength(10)) + require.NoError(t, err) + for i := 0; i < total; i++ { + base.Record.Int = int32(i) + require.NoError(t, enc.Encode(base)) + } + require.NoError(t, enc.Close()) + + // decode header once + data := buf.Bytes() + r0 := avro.NewReader(bytes.NewReader(data), 1024) + hdr, err := ocf.DecodeHeader(r0) + require.NoError(t, err) + + // concurrency values to test; caller requirement: configurable concurrency + concs := []int64{1} + + // split file into parts by size and let workers align to sync using SkipTo + headerEnd := r0.InputOffset() + for _, conc := range concs { + t.Run(fmt.Sprintf("concurrency=%d", conc), func(t *testing.T) { + totalData := int64(len(data)) - headerEnd + partSize := totalData / int64(conc) + + recCh := make(chan FullRecord, total) + var wg sync.WaitGroup + + errCh := make(chan error, 1) + sendErr := func(err error) { + select { + case errCh <- err: + default: + } + } + + for i := int64(0); i < conc; i++ { + start := headerEnd + i*partSize + end := headerEnd + (i+1)*partSize + if i == conc-1 { + end = int64(len(data)) + } + + wg.Add(1) + go func(start, end int64) { + defer wg.Done() + r := avro.NewReader(bytes.NewReader(data[start:]), 1024) + skipped := int64(0) + // align to next sync marker unless starting at header end + if start != headerEnd { + n, err := r.SkipTo(hdr.Sync[:]) + if err != nil && !errors.Is(err, io.EOF) { + sendErr(err) + return + } + // if SkipTo advanced past our partition end, nothing to do + skipped = int64(n) + if start+skipped >= end { + return + } + } + + dec, err := ocf.NewDecoderWithHeader(r, hdr) + if err != nil { + sendErr(err) + return + } + defer dec.Close() + + for dec.HasNext() { + var rec FullRecord + if err := dec.Decode(&rec); err != nil { + sendErr(err) + return + } + recCh <- rec + bs := dec.BlockStatus() + if bs.Current > bs.Count { + if start+bs.Offset > end { + return + } + } + } + if err := dec.Error(); err != nil { + sendErr(err) + return + } + }(start, end) + } + + go func() { wg.Wait(); close(recCh) }() + + var got []int32 + for r := range recCh { + got = append(got, r.Record.Int) + } + + select { + case e := <-errCh: + if e != nil { + t.Fatalf("worker error: %v", e) + } + default: + } + + require.Equal(t, total, len(got), "unexpected number of records read") + sort.Slice(got, func(i, j int) bool { return got[i] < got[j] }) + for i := 0; i < total; i++ { + require.Equal(t, int32(i), got[i]) + } + }) + } +} diff --git a/reader.go b/reader.go index 3c11b18..a256bd7 100644 --- a/reader.go +++ b/reader.go @@ -31,6 +31,7 @@ type Reader struct { buf []byte head int tail int + offset int64 Error error } @@ -42,6 +43,7 @@ func NewReader(r io.Reader, bufSize int, opts ...ReaderFunc) *Reader { buf: make([]byte, bufSize), head: 0, tail: 0, + offset: 0, } for _, opt := range opts { @@ -57,6 +59,8 @@ func (r *Reader) Reset(b []byte) *Reader { r.buf = b r.head = 0 r.tail = len(b) + r.offset = 0 + r.Error = nil return r } @@ -90,6 +94,7 @@ func (r *Reader) loadMore() bool { continue } + r.offset += int64(r.tail) r.head = 0 r.tail = n return true @@ -322,3 +327,8 @@ func (r *Reader) ReadBlockHeader() (int64, int64) { return length, 0 } + +// InputOffset returns the current input offset of the Reader. +func (r *Reader) InputOffset() int64 { + return r.offset + int64(r.head) +} diff --git a/reader_skip.go b/reader_skip.go index 94288c8..16e23ae 100644 --- a/reader_skip.go +++ b/reader_skip.go @@ -1,5 +1,10 @@ package avro +import ( + "bytes" + "fmt" +) + // SkipNBytes skips the given number of bytes in the reader. func (r *Reader) SkipNBytes(n int) { read := 0 @@ -77,3 +82,76 @@ func (r *Reader) SkipBytes() { } r.SkipNBytes(int(size)) } + +// SkipTo skips to the given token in the reader. +func (r *Reader) SkipTo(token []byte) (int, error) { + tokenLen := len(token) + if tokenLen == 0 { + return 0, nil + } + if tokenLen > len(r.buf) { + return 0, fmt.Errorf("token length %d exceeds buffer size %d", tokenLen, len(r.buf)) + } + + var skipped int + var stash []byte + + for { + // Check boundary if we have stash from previous read + if len(stash) > 0 { + need := tokenLen - 1 + if r.tail-r.head < need { + need = r.tail - r.head + } + + // Construct boundary window: stash + beginning of new buffer + boundary := make([]byte, len(stash)+need) + copy(boundary, stash) + copy(boundary[len(stash):], r.buf[r.head:r.head+need]) + + if idx := bytes.Index(boundary, token); idx >= 0 { + // Found in boundary + bytesToEndOfToken := idx + tokenLen + skipped += bytesToEndOfToken + + // Advance r.head by the number of bytes used from r.buf + bufferBytesConsumed := bytesToEndOfToken - len(stash) + r.head += bufferBytesConsumed + return skipped, nil + } + + // Not found in boundary, stash is definitely skipped + skipped += len(stash) + stash = nil + } + + // Search in current buffer + idx := bytes.Index(r.buf[r.head:r.tail], token) + if idx >= 0 { + advance := idx + tokenLen + r.head += advance + skipped += advance + return skipped, nil + } + + // Prepare stash for next iteration + available := r.tail - r.head + keep := tokenLen - 1 + if keep > available { + keep = available + } + + // Bytes that are definitely skipped (not kept in stash) + consumed := available - keep + skipped += consumed + + if keep > 0 { + stash = make([]byte, keep) + copy(stash, r.buf[r.tail-keep:r.tail]) + } + + if !r.loadMore() { + return skipped, r.Error + } + } +} diff --git a/reader_skip_test.go b/reader_skip_test.go index a821880..2610544 100644 --- a/reader_skip_test.go +++ b/reader_skip_test.go @@ -9,147 +9,77 @@ import ( "github.com/stretchr/testify/require" ) -func TestReader_SkipNBytes(t *testing.T) { - data := []byte{0x01, 0x01, 0x01, 0x36} - r := avro.NewReader(bytes.NewReader(data), 2) - - r.SkipNBytes(3) - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) -} - -func TestReader_SkipNBytesEOF(t *testing.T) { - data := []byte{0x01, 0x36} - r := avro.NewReader(bytes.NewReader(data), 2) - - r.SkipNBytes(3) - - assert.Error(t, r.Error) -} - -func TestReader_SkipBool(t *testing.T) { - data := []byte{0x01, 0x36} - r := avro.NewReader(bytes.NewReader(data), 10) - - r.SkipBool() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) -} - -func TestReader_SkipInt(t *testing.T) { +func TestReader_SkipTo(t *testing.T) { tests := []struct { - name string - data []byte + name string + data []byte + bufSize int + token []byte + wantSkipped int + wantErr require.ErrorAssertionFunc }{ + // TokenInFirstBuffer { - name: "Skipped", - data: []byte{0x38, 0x36}, + name: "TokenInFirstBuffer", + data: []byte("abcdefgTOKENhij"), + bufSize: 1024, + token: []byte("TOKEN"), + wantSkipped: 12, // "abcdefgTOKEN" length + wantErr: require.NoError, }, + // TokenSplitAcrossBuffers { - name: "Overflow", - data: []byte{0xE2, 0xA2, 0xF3, 0xAD, 0xAD, 0x36}, + name: "TokenSplitAcrossBuffers", + data: append(append(make([]byte, 10), []byte("TO")...), []byte("KEN")...), + bufSize: 12, // 10 filler + "TO" = 12 bytes. Split happens exactly after "TO". + token: []byte("TOKEN"), + wantSkipped: 15, // 10 filler + TOKEN + wantErr: require.NoError, + }, + // FalsePositiveSplit: XXKEN should NOT match TOKEN + { + name: "FalsePositiveSplit", + data: []byte("XXKEN"), + bufSize: 2, // Split "XX", "KEN" + token: []byte("TOKEN"), + wantSkipped: 0, + wantErr: require.Error, // Should fail to find TOKEN + }, + // TokenNotFound + { + name: "TokenNotFound", + data: []byte("abcdefg"), + bufSize: 1024, + token: []byte("XYZ"), + wantSkipped: 7, + wantErr: require.Error, // EOF causes error in SkipTo }, - } - - for _, test := range tests { - test := test - t.Run(test.name, func(t *testing.T) { - r := avro.NewReader(bytes.NewReader(test.data), 10) - - r.SkipInt() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) - }) - } -} - -func TestReader_SkipLong(t *testing.T) { - tests := []struct { - name string - data []byte - }{ { - name: "Skipped", - data: []byte{0x38, 0x36}, + name: "EmptyToken", + data: []byte("abc"), + bufSize: 1024, + token: []byte{}, + wantSkipped: 0, + wantErr: require.NoError, }, { - name: "Overflow", - data: []byte{0xE2, 0xA2, 0xF3, 0xAD, 0xAD, 0xAD, 0xE2, 0xA2, 0xF3, 0xAD, 0x36}, + name: "PartialMatchAtEndButNotComplete", + data: []byte("abcTO"), + bufSize: 10, + token: []byte("TOKEN"), + wantSkipped: 5, + wantErr: require.Error, }, } - for _, test := range tests { - test := test - t.Run(test.name, func(t *testing.T) { - r := avro.NewReader(bytes.NewReader(test.data), 10) - - r.SkipLong() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := avro.NewReader(bytes.NewReader(tt.data), tt.bufSize) + skipped, err := r.SkipTo(tt.token) + tt.wantErr(t, err) + if err == nil { + assert.Equal(t, tt.wantSkipped, skipped) + } }) } } - -func TestReader_SkipFloat(t *testing.T) { - data := []byte{0x00, 0x00, 0x00, 0x00, 0x36} - r := avro.NewReader(bytes.NewReader(data), 10) - - r.SkipFloat() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) -} - -func TestReader_SkipDouble(t *testing.T) { - data := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x36} - r := avro.NewReader(bytes.NewReader(data), 10) - - r.SkipDouble() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) -} - -func TestReader_SkipString(t *testing.T) { - data := []byte{0x06, 0x66, 0x6F, 0x6F, 0x36} - r := avro.NewReader(bytes.NewReader(data), 10) - - r.SkipString() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) -} - -func TestReader_SkipStringEmpty(t *testing.T) { - data := []byte{0x00, 0x36} - r := avro.NewReader(bytes.NewReader(data), 10) - - r.SkipString() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) -} - -func TestReader_SkipBytes(t *testing.T) { - data := []byte{0x06, 0x66, 0x6F, 0x6F, 0x36} - r := avro.NewReader(bytes.NewReader(data), 10) - - r.SkipBytes() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) -} - -func TestReader_SkipBytesEmpty(t *testing.T) { - data := []byte{0x00, 0x36} - r := avro.NewReader(bytes.NewReader(data), 10) - - r.SkipBytes() - - require.NoError(t, r.Error) - assert.Equal(t, int32(27), r.ReadInt()) -} From e5cf86965b759fdacfcbac92e03605e534303ca6 Mon Sep 17 00:00:00 2001 From: Aryeh Klein Date: Tue, 16 Dec 2025 20:16:46 +0200 Subject: [PATCH 2/8] support reseting encoder to reduce load on GC. (#587) --- encoder.go | 5 + ocf/ocf.go | 37 +++++++ ocf/ocf_test.go | 262 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 304 insertions(+) diff --git a/encoder.go b/encoder.go index faa285e..49e3d7b 100644 --- a/encoder.go +++ b/encoder.go @@ -31,6 +31,11 @@ func (e *Encoder) Encode(v any) error { return e.w.Error } +// Reset resets the encoder to write to a new io.Writer. +func (e *Encoder) Reset(w io.Writer) { + e.w.Reset(w) +} + // Marshal returns the Avro encoding of v. func Marshal(schema Schema, v any) ([]byte, error) { return DefaultConfig.Marshal(schema, v) diff --git a/ocf/ocf.go b/ocf/ocf.go index 1a7d535..8f8f0af 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -381,6 +381,9 @@ type Encoder struct { blockLength int count int blockSize int + + // Stored for Reset. + header Header } // NewEncoder returns a new encoder that writes to w using schema s. @@ -434,6 +437,11 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e codec: h.Codec, blockLength: cfg.BlockLength, blockSize: cfg.BlockSize, + header: Header{ + Magic: magicBytes, + Meta: h.Meta, + Sync: h.Sync, + }, } return e, nil } @@ -475,6 +483,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e codec: codec, blockLength: cfg.BlockLength, blockSize: cfg.BlockSize, + header: header, } return e, nil } @@ -563,6 +572,34 @@ func (e *Encoder) Close() error { return err } +// Reset flushes any pending data, resets the encoder to write to a new io.Writer, +// and writes a fresh header with a new sync marker. The schema, codec, and other +// settings are preserved from the original encoder. +// This allows reusing the encoder for multiple files without reallocating buffers. +func (e *Encoder) Reset(w io.Writer) error { + if err := e.Flush(); err != nil { + return err + } + + // Generate new sync marker for the new file. + _, _ = rand.Read(e.header.Sync[:]) + e.sync = e.header.Sync + + // Reset writer to new output and write header. + e.writer.Reset(w) + e.writer.WriteVal(HeaderSchema, e.header) + if err := e.writer.Flush(); err != nil { + return err + } + + // Reset buffer and encoder. + e.buf.Reset() + e.encoder.Reset(e.buf) + e.count = 0 + + return nil +} + func (e *Encoder) writerBlock() error { e.writer.WriteLong(int64(e.count)) diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index 8d1baf7..3625dac 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -1443,3 +1443,265 @@ func TestConcurrentDecode(t *testing.T) { }) } } +// TestEncoder_Reset tests that Reset allows reusing encoder for multiple files. +func TestEncoder_Reset(t *testing.T) { + record1 := FullRecord{ + Strings: []string{"first", "record"}, + Longs: []int64{}, + Enum: "A", + Map: map[string]int{}, + Record: &TestRecord{Long: 1}, + } + record2 := FullRecord{ + Strings: []string{"second", "record"}, + Longs: []int64{}, + Enum: "B", + Map: map[string]int{}, + Record: &TestRecord{Long: 2}, + } + + // Create first file + buf1 := &bytes.Buffer{} + enc, err := ocf.NewEncoder(schema, buf1) + require.NoError(t, err) + + err = enc.Encode(record1) + require.NoError(t, err) + + err = enc.Close() + require.NoError(t, err) + + // Reset to write to second file + buf2 := &bytes.Buffer{} + err = enc.Reset(buf2) + require.NoError(t, err) + + err = enc.Encode(record2) + require.NoError(t, err) + + err = enc.Close() + require.NoError(t, err) + + // Verify first file + dec1, err := ocf.NewDecoder(buf1) + require.NoError(t, err) + + require.True(t, dec1.HasNext()) + var got1 FullRecord + err = dec1.Decode(&got1) + require.NoError(t, err) + assert.Equal(t, record1, got1) + require.False(t, dec1.HasNext()) + + // Verify second file + dec2, err := ocf.NewDecoder(buf2) + require.NoError(t, err) + + require.True(t, dec2.HasNext()) + var got2 FullRecord + err = dec2.Decode(&got2) + require.NoError(t, err) + assert.Equal(t, record2, got2) + require.False(t, dec2.HasNext()) +} + +// TestEncoder_ResetWithPendingData tests Reset flushes pending data. +func TestEncoder_ResetWithPendingData(t *testing.T) { + buf1 := &bytes.Buffer{} + enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithBlockLength(10)) + require.NoError(t, err) + + // Write data but don't close (pending data) + err = enc.Encode(int64(42)) + require.NoError(t, err) + + // Reset should flush the pending data + buf2 := &bytes.Buffer{} + err = enc.Reset(buf2) + require.NoError(t, err) + + // Verify first file has the data + dec1, err := ocf.NewDecoder(buf1) + require.NoError(t, err) + + require.True(t, dec1.HasNext()) + var got int64 + err = dec1.Decode(&got) + require.NoError(t, err) + assert.Equal(t, int64(42), got) +} + +// TestEncoder_ResetGeneratesNewSyncMarker tests that each reset creates a new sync marker. +func TestEncoder_ResetGeneratesNewSyncMarker(t *testing.T) { + buf1 := &bytes.Buffer{} + enc, err := ocf.NewEncoder(`"long"`, buf1) + require.NoError(t, err) + + err = enc.Encode(int64(1)) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + + // Get sync marker from first file + dec1, err := ocf.NewDecoder(bytes.NewReader(buf1.Bytes())) + require.NoError(t, err) + + reader1 := avro.NewReader(bytes.NewReader(buf1.Bytes()), 1024) + var h1 ocf.Header + reader1.ReadVal(ocf.HeaderSchema, &h1) + require.NoError(t, reader1.Error) + sync1 := h1.Sync + + // Reset to second buffer + buf2 := &bytes.Buffer{} + err = enc.Reset(buf2) + require.NoError(t, err) + + err = enc.Encode(int64(2)) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + + // Get sync marker from second file + reader2 := avro.NewReader(bytes.NewReader(buf2.Bytes()), 1024) + var h2 ocf.Header + reader2.ReadVal(ocf.HeaderSchema, &h2) + require.NoError(t, reader2.Error) + sync2 := h2.Sync + + // Sync markers should be different + assert.NotEqual(t, sync1, sync2, "each file should have a unique sync marker") + + // But both files should be readable + _ = dec1 + dec2, err := ocf.NewDecoder(buf2) + require.NoError(t, err) + require.True(t, dec2.HasNext()) +} + +// TestEncoder_ResetMultipleTimes tests multiple sequential resets. +func TestEncoder_ResetMultipleTimes(t *testing.T) { + buffers := make([]*bytes.Buffer, 5) + for i := range buffers { + buffers[i] = &bytes.Buffer{} + } + + enc, err := ocf.NewEncoder(`"long"`, buffers[0]) + require.NoError(t, err) + + for i := 0; i < 5; i++ { + if i > 0 { + err = enc.Reset(buffers[i]) + require.NoError(t, err) + } + + err = enc.Encode(int64(i * 10)) + require.NoError(t, err) + + err = enc.Close() + require.NoError(t, err) + } + + // Verify all files + for i := 0; i < 5; i++ { + dec, err := ocf.NewDecoder(buffers[i]) + require.NoError(t, err, "file %d", i) + + require.True(t, dec.HasNext(), "file %d", i) + var got int64 + err = dec.Decode(&got) + require.NoError(t, err, "file %d", i) + assert.Equal(t, int64(i*10), got, "file %d", i) + } +} + +// TestEncoder_AppendToExistingFile tests appending records to an existing OCF file. +func TestEncoder_AppendToExistingFile(t *testing.T) { + type SimpleRecord struct { + Name string `avro:"name"` + ID int64 `avro:"id"` + } + simpleSchema := `{"type":"record","name":"SimpleRecord","fields":[{"name":"name","type":"string"},{"name":"id","type":"long"}]}` + + record1 := SimpleRecord{Name: "first", ID: 1} + record2 := SimpleRecord{Name: "second", ID: 2} + + tmpFile, err := os.CreateTemp("", "append-test-*.avro") + require.NoError(t, err) + tmpName := tmpFile.Name() + t.Cleanup(func() { _ = os.Remove(tmpName) }) + + // Write first record + enc, err := ocf.NewEncoder(simpleSchema, tmpFile) + require.NoError(t, err) + err = enc.Encode(record1) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + err = tmpFile.Close() + require.NoError(t, err) + + // Reopen file and append second record + file, err := os.OpenFile(tmpName, os.O_RDWR, 0o644) + require.NoError(t, err) + + enc2, err := ocf.NewEncoder(simpleSchema, file) + require.NoError(t, err) + err = enc2.Encode(record2) + require.NoError(t, err) + err = enc2.Close() + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + + // Read back and verify both records + file, err = os.Open(tmpName) + require.NoError(t, err) + defer file.Close() + + dec, err := ocf.NewDecoder(file) + require.NoError(t, err) + + var records []SimpleRecord + for dec.HasNext() { + var r SimpleRecord + err = dec.Decode(&r) + require.NoError(t, err) + records = append(records, r) + } + require.NoError(t, dec.Error()) + + require.Len(t, records, 2) + assert.Equal(t, record1, records[0]) + assert.Equal(t, record2, records[1]) +} + +// TestEncoder_ResetPreservesCodec tests that codec is preserved across reset. +func TestEncoder_ResetPreservesCodec(t *testing.T) { + buf1 := &bytes.Buffer{} + enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithCodec(ocf.Deflate)) + require.NoError(t, err) + + err = enc.Encode(int64(1)) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + + buf2 := &bytes.Buffer{} + err = enc.Reset(buf2) + require.NoError(t, err) + + err = enc.Encode(int64(2)) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) + + // Both files should use deflate codec + dec1, err := ocf.NewDecoder(buf1) + require.NoError(t, err) + assert.Equal(t, []byte("deflate"), dec1.Metadata()["avro.codec"]) + + dec2, err := ocf.NewDecoder(buf2) + require.NoError(t, err) + assert.Equal(t, []byte("deflate"), dec2.Metadata()["avro.codec"]) +} From 83e86731586e2ec730341313b13e3af519b2b6da Mon Sep 17 00:00:00 2001 From: Rener Castro Date: Tue, 16 Dec 2025 20:08:21 -0300 Subject: [PATCH 3/8] Revert "support reseting encoder to reduce load on GC. (#587)" This reverts commit 2bffcc18ed6d90019199ef9c8a95de450767db6b. --- encoder.go | 5 ----- ocf/ocf.go | 37 ------------------------------------- 2 files changed, 42 deletions(-) diff --git a/encoder.go b/encoder.go index 49e3d7b..faa285e 100644 --- a/encoder.go +++ b/encoder.go @@ -31,11 +31,6 @@ func (e *Encoder) Encode(v any) error { return e.w.Error } -// Reset resets the encoder to write to a new io.Writer. -func (e *Encoder) Reset(w io.Writer) { - e.w.Reset(w) -} - // Marshal returns the Avro encoding of v. func Marshal(schema Schema, v any) ([]byte, error) { return DefaultConfig.Marshal(schema, v) diff --git a/ocf/ocf.go b/ocf/ocf.go index 8f8f0af..1a7d535 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -381,9 +381,6 @@ type Encoder struct { blockLength int count int blockSize int - - // Stored for Reset. - header Header } // NewEncoder returns a new encoder that writes to w using schema s. @@ -437,11 +434,6 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e codec: h.Codec, blockLength: cfg.BlockLength, blockSize: cfg.BlockSize, - header: Header{ - Magic: magicBytes, - Meta: h.Meta, - Sync: h.Sync, - }, } return e, nil } @@ -483,7 +475,6 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e codec: codec, blockLength: cfg.BlockLength, blockSize: cfg.BlockSize, - header: header, } return e, nil } @@ -572,34 +563,6 @@ func (e *Encoder) Close() error { return err } -// Reset flushes any pending data, resets the encoder to write to a new io.Writer, -// and writes a fresh header with a new sync marker. The schema, codec, and other -// settings are preserved from the original encoder. -// This allows reusing the encoder for multiple files without reallocating buffers. -func (e *Encoder) Reset(w io.Writer) error { - if err := e.Flush(); err != nil { - return err - } - - // Generate new sync marker for the new file. - _, _ = rand.Read(e.header.Sync[:]) - e.sync = e.header.Sync - - // Reset writer to new output and write header. - e.writer.Reset(w) - e.writer.WriteVal(HeaderSchema, e.header) - if err := e.writer.Flush(); err != nil { - return err - } - - // Reset buffer and encoder. - e.buf.Reset() - e.encoder.Reset(e.buf) - e.count = 0 - - return nil -} - func (e *Encoder) writerBlock() error { e.writer.WriteLong(int64(e.count)) From d9175e76fdea0a4586ccb0e3b2ada505ed67446f Mon Sep 17 00:00:00 2001 From: Rener Castro Date: Tue, 16 Dec 2025 20:19:26 -0300 Subject: [PATCH 4/8] test: add comprehensive tests for Avro reader skip methods --- reader_skip_test.go | 147 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/reader_skip_test.go b/reader_skip_test.go index 2610544..7103ddc 100644 --- a/reader_skip_test.go +++ b/reader_skip_test.go @@ -9,6 +9,153 @@ import ( "github.com/stretchr/testify/require" ) +func TestReader_SkipNBytes(t *testing.T) { + data := []byte{0x01, 0x01, 0x01, 0x36} + r := avro.NewReader(bytes.NewReader(data), 2) + + r.SkipNBytes(3) + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) +} + +func TestReader_SkipNBytesEOF(t *testing.T) { + data := []byte{0x01, 0x36} + r := avro.NewReader(bytes.NewReader(data), 2) + + r.SkipNBytes(3) + + assert.Error(t, r.Error) +} + +func TestReader_SkipBool(t *testing.T) { + data := []byte{0x01, 0x36} + r := avro.NewReader(bytes.NewReader(data), 10) + + r.SkipBool() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) +} + +func TestReader_SkipInt(t *testing.T) { + tests := []struct { + name string + data []byte + }{ + { + name: "Skipped", + data: []byte{0x38, 0x36}, + }, + { + + name: "Overflow", + data: []byte{0xE2, 0xA2, 0xF3, 0xAD, 0xAD, 0x36}, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + r := avro.NewReader(bytes.NewReader(test.data), 10) + + r.SkipInt() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) + }) + } +} + +func TestReader_SkipLong(t *testing.T) { + tests := []struct { + name string + data []byte + }{ + { + name: "Skipped", + data: []byte{0x38, 0x36}, + }, + { + + name: "Overflow", + data: []byte{0xE2, 0xA2, 0xF3, 0xAD, 0xAD, 0xAD, 0xE2, 0xA2, 0xF3, 0xAD, 0x36}, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + r := avro.NewReader(bytes.NewReader(test.data), 10) + + r.SkipLong() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) + }) + } +} + +func TestReader_SkipFloat(t *testing.T) { + data := []byte{0x00, 0x00, 0x00, 0x00, 0x36} + r := avro.NewReader(bytes.NewReader(data), 10) + + r.SkipFloat() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) +} + +func TestReader_SkipDouble(t *testing.T) { + data := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x36} + r := avro.NewReader(bytes.NewReader(data), 10) + + r.SkipDouble() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) +} + +func TestReader_SkipString(t *testing.T) { + data := []byte{0x06, 0x66, 0x6F, 0x6F, 0x36} + r := avro.NewReader(bytes.NewReader(data), 10) + + r.SkipString() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) +} + +func TestReader_SkipStringEmpty(t *testing.T) { + data := []byte{0x00, 0x36} + r := avro.NewReader(bytes.NewReader(data), 10) + + r.SkipString() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) +} + +func TestReader_SkipBytes(t *testing.T) { + data := []byte{0x06, 0x66, 0x6F, 0x6F, 0x36} + r := avro.NewReader(bytes.NewReader(data), 10) + + r.SkipBytes() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) +} + +func TestReader_SkipBytesEmpty(t *testing.T) { + data := []byte{0x00, 0x36} + r := avro.NewReader(bytes.NewReader(data), 10) + + r.SkipBytes() + + require.NoError(t, r.Error) + assert.Equal(t, int32(27), r.ReadInt()) +} + func TestReader_SkipTo(t *testing.T) { tests := []struct { name string From 60eaece97966f3b6178c6c0a7fd1b000c38ac456 Mon Sep 17 00:00:00 2001 From: Rener Castro Date: Tue, 16 Dec 2025 20:23:06 -0300 Subject: [PATCH 5/8] fix tests --- ocf/ocf_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index 3625dac..5b46904 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -1277,6 +1277,7 @@ func TestWithSchemaMarshaler(t *testing.T) { want, err := os.ReadFile("testdata/full-schema.json") require.NoError(t, err) + want = bytes.ReplaceAll(want, []byte("\r\n"), []byte("\n")) assert.Equal(t, want, got) } @@ -1443,6 +1444,7 @@ func TestConcurrentDecode(t *testing.T) { }) } } + // TestEncoder_Reset tests that Reset allows reusing encoder for multiple files. func TestEncoder_Reset(t *testing.T) { record1 := FullRecord{ From bcc8be688162ed54f1c24691fffcc11783f71a27 Mon Sep 17 00:00:00 2001 From: Rener Castro Date: Tue, 16 Dec 2025 20:29:58 -0300 Subject: [PATCH 6/8] test --- ocf/ocf_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index 5b46904..35932ad 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -1347,7 +1347,7 @@ func TestConcurrentDecode(t *testing.T) { require.NoError(t, err) // concurrency values to test; caller requirement: configurable concurrency - concs := []int64{1} + concs := []int64{1, 2, 3, 5} // split file into parts by size and let workers align to sync using SkipTo headerEnd := r0.InputOffset() From 069ff003b6a2be414a8372c3757b8a175d39a3d0 Mon Sep 17 00:00:00 2001 From: Rener Castro Date: Tue, 16 Dec 2025 20:49:32 -0300 Subject: [PATCH 7/8] golangci-lint --- ocf/ocf.go | 7 +++++-- reader_skip.go | 10 ++-------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/ocf/ocf.go b/ocf/ocf.go index 8f8f0af..9152308 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -138,7 +138,7 @@ func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) { }, nil } -// NewDecoder returns a new decoder that reads from reader r. +// NewDecoderWithHeader returns a new decoder that reads from reader r using the provided header. func NewDecoderWithHeader(r *avro.Reader, h *OCFHeader, opts ...DecoderFunc) (*Decoder, error) { cfg := newDecoderConfig(opts...) decReader := bytesx.NewResetReader([]byte{}) @@ -216,6 +216,7 @@ func (d *Decoder) Close() error { return nil } +// BlockStatus represents the status of the current block. type BlockStatus struct { Current int64 Count int64 @@ -223,6 +224,7 @@ type BlockStatus struct { Offset int64 } +// BlockStatus returns the current block status. func (d *Decoder) BlockStatus() *BlockStatus { return &BlockStatus{ Current: d.n - d.count + 1, @@ -615,7 +617,8 @@ func (e *Encoder) writerBlock() error { return e.writer.Flush() } -type OCFHeader struct { +// OCFHeader represents the parsed header of an OCF file. +type OCFHeader struct { //nolint:revive Schema avro.Schema Codec Codec Meta map[string][]byte diff --git a/reader_skip.go b/reader_skip.go index 16e23ae..4cfdc90 100644 --- a/reader_skip.go +++ b/reader_skip.go @@ -99,10 +99,7 @@ func (r *Reader) SkipTo(token []byte) (int, error) { for { // Check boundary if we have stash from previous read if len(stash) > 0 { - need := tokenLen - 1 - if r.tail-r.head < need { - need = r.tail - r.head - } + need := min(r.tail-r.head, tokenLen-1) // Construct boundary window: stash + beginning of new buffer boundary := make([]byte, len(stash)+need) @@ -136,10 +133,7 @@ func (r *Reader) SkipTo(token []byte) (int, error) { // Prepare stash for next iteration available := r.tail - r.head - keep := tokenLen - 1 - if keep > available { - keep = available - } + keep := min(tokenLen-1, available) // Bytes that are definitely skipped (not kept in stash) consumed := available - keep From 282a47f976b06839a822c83c79f7f6770a2a03ee Mon Sep 17 00:00:00 2001 From: Rener Castro Date: Wed, 17 Dec 2025 23:17:12 -0300 Subject: [PATCH 8/8] fix: change writer container config to default on ocf.Encoder --- ocf/ocf.go | 4 ++-- ocf/ocf_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/ocf/ocf.go b/ocf/ocf.go index 9152308..02442f7 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -429,7 +429,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e return nil, err } - writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig)) + writer := avro.NewWriter(w, 512, avro.WithWriterConfig(avro.DefaultConfig)) buf := &bytes.Buffer{} e := &Encoder{ writer: writer, @@ -470,7 +470,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e return nil, err } - writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig)) + writer := avro.NewWriter(w, 512, avro.WithWriterConfig(avro.DefaultConfig)) writer.WriteVal(HeaderSchema, header) if err = writer.Flush(); err != nil { return nil, err diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index 35932ad..5d9c98c 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -1707,3 +1707,58 @@ func TestEncoder_ResetPreservesCodec(t *testing.T) { require.NoError(t, err) assert.Equal(t, []byte("deflate"), dec2.Metadata()["avro.codec"]) } + +type CustomTagTestObject struct { + StringField string `json:"string_field"` + IntField int `json:"int_field"` +} + +func TestCustomTagKey(t *testing.T) { + // Define schema matching the json tags + schemaStr := `{ + "type": "record", + "name": "CustomTagTestObject", + "fields": [ + {"name": "string_field", "type": "string"}, + {"name": "int_field", "type": "int"} + ] + }` + + // Create a Config with TagKey set to "json" + config := avro.Config{ + TagKey: "json", + }.Freeze() + + // Create a buffer to write the OCF file to + var buf bytes.Buffer + + // Create OCF encoder with custom encoding config + enc, err := ocf.NewEncoder(schemaStr, &buf, ocf.WithEncodingConfig(config)) + require.NoError(t, err) + + // Data to encode + data := CustomTagTestObject{ + StringField: "hello", + IntField: 42, + } + + // Encode using the OCF encoder + err = enc.Encode(data) + require.NoError(t, err) + + // Close the encoder to flush data + err = enc.Close() + require.NoError(t, err) + + // Verify the output by decoding + dec, err := ocf.NewDecoder(&buf, ocf.WithDecoderConfig(config)) + require.NoError(t, err) + + var result CustomTagTestObject + require.True(t, dec.HasNext()) + err = dec.Decode(&result) + require.NoError(t, err) + + assert.Equal(t, data.StringField, result.StringField) + assert.Equal(t, data.IntField, result.IntField) +}