From 89d79a96aab61c7596ca4505c82f2992469b2be0 Mon Sep 17 00:00:00 2001 From: aryehklein-rise Date: Mon, 15 Dec 2025 13:26:31 +0200 Subject: [PATCH 1/2] support reusing the same zstd encoder/decoder. --- ocf/codec.go | 21 ++++++++++++-- ocf/ocf.go | 18 ++++++++++++ ocf/ocf_test.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 2 deletions(-) diff --git a/ocf/codec.go b/ocf/codec.go index 8d0cab2..db7c9cf 100644 --- a/ocf/codec.go +++ b/ocf/codec.go @@ -32,6 +32,10 @@ type codecOptions struct { type zstdOptions struct { EOptions []zstd.EOption DOptions []zstd.DOption + // Encoder and Decoder allow sharing pre-created instances across multiple codecs. + // When set, EOptions/DOptions are ignored for that component. + Encoder *zstd.Encoder + Decoder *zstd.Decoder } func resolveCodec(name CodecName, codecOpts codecOptions) (Codec, error) { @@ -143,8 +147,21 @@ type ZStandardCodec struct { } func newZStandardCodec(opts zstdOptions) *ZStandardCodec { - decoder, _ := zstd.NewReader(nil, opts.DOptions...) - encoder, _ := zstd.NewWriter(nil, opts.EOptions...) + var decoder *zstd.Decoder + var encoder *zstd.Encoder + + if opts.Decoder != nil { + decoder = opts.Decoder + } else { + decoder, _ = zstd.NewReader(nil, opts.DOptions...) + } + + if opts.Encoder != nil { + encoder = opts.Encoder + } else { + encoder, _ = zstd.NewWriter(nil, opts.EOptions...) + } + return &ZStandardCodec{ decoder: decoder, encoder: encoder, diff --git a/ocf/ocf.go b/ocf/ocf.go index 4a1030e..690f903 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -84,6 +84,15 @@ func WithZStandardDecoderOptions(opts ...zstd.DOption) DecoderFunc { } } +// WithZStandardDecoder sets a pre-created ZStandard decoder to be reused. +// This allows sharing a single decoder across multiple OCF decoders for efficiency. +// The caller is responsible for closing the decoder after all OCF decoders are done. +func WithZStandardDecoder(dec *zstd.Decoder) DecoderFunc { + return func(cfg *decoderConfig) { + cfg.CodecOptions.ZStandardOptions.Decoder = dec + } +} + // Decoder reads and decodes Avro values from a container file. type Decoder struct { reader *avro.Reader @@ -268,6 +277,15 @@ func WithZStandardEncoderOptions(opts ...zstd.EOption) EncoderFunc { } } +// WithZStandardEncoder sets a pre-created ZStandard encoder to be reused. +// This allows sharing a single encoder across multiple OCF encoders for efficiency. +// The caller is responsible for closing the encoder after all OCF encoders are done. +func WithZStandardEncoder(enc *zstd.Encoder) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.CodecOptions.ZStandardOptions.Encoder = enc + } +} + // WithMetadata sets the metadata on the encoder header. func WithMetadata(meta map[string][]byte) EncoderFunc { return func(cfg *encoderConfig) { diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index b22e6dc..7452151 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -1312,3 +1312,77 @@ type errorHeaderWriter struct{} func (*errorHeaderWriter) Write(p []byte) (int, error) { return 0, errors.New("test") } + +func TestSharedZstdEncoder(t *testing.T) { + schema := `{"type": "string"}` + + // Create a shared zstd encoder + sharedEncoder, err := zstd.NewWriter(nil) + require.NoError(t, err) + defer sharedEncoder.Close() + + // Use the shared encoder with multiple OCF encoders + var buf1, buf2 bytes.Buffer + + enc1, err := ocf.NewEncoder(schema, &buf1, ocf.WithCodec(ocf.ZStandard), ocf.WithZStandardEncoder(sharedEncoder)) + require.NoError(t, err) + require.NoError(t, enc1.Encode("hello from encoder 1")) + require.NoError(t, enc1.Close()) + + enc2, err := ocf.NewEncoder(schema, &buf2, ocf.WithCodec(ocf.ZStandard), ocf.WithZStandardEncoder(sharedEncoder)) + require.NoError(t, err) + require.NoError(t, enc2.Encode("hello from encoder 2")) + require.NoError(t, enc2.Close()) + + // Verify both files can be read + dec1, err := ocf.NewDecoder(&buf1) + require.NoError(t, err) + require.True(t, dec1.HasNext()) + var result1 string + require.NoError(t, dec1.Decode(&result1)) + assert.Equal(t, "hello from encoder 1", result1) + + dec2, err := ocf.NewDecoder(&buf2) + require.NoError(t, err) + require.True(t, dec2.HasNext()) + var result2 string + require.NoError(t, dec2.Decode(&result2)) + assert.Equal(t, "hello from encoder 2", result2) +} + +func TestSharedZstdDecoder(t *testing.T) { + schema := `{"type": "string"}` + + // Create two OCF files + var buf1, buf2 bytes.Buffer + + enc1, err := ocf.NewEncoder(schema, &buf1, ocf.WithCodec(ocf.ZStandard)) + require.NoError(t, err) + require.NoError(t, enc1.Encode("data in file 1")) + require.NoError(t, enc1.Close()) + + enc2, err := ocf.NewEncoder(schema, &buf2, ocf.WithCodec(ocf.ZStandard)) + require.NoError(t, err) + require.NoError(t, enc2.Encode("data in file 2")) + require.NoError(t, enc2.Close()) + + // Create a shared zstd decoder + sharedDecoder, err := zstd.NewReader(nil) + require.NoError(t, err) + defer sharedDecoder.Close() + + // Use the shared decoder with multiple OCF decoders + dec1, err := ocf.NewDecoder(&buf1, ocf.WithZStandardDecoder(sharedDecoder)) + require.NoError(t, err) + require.True(t, dec1.HasNext()) + var result1 string + require.NoError(t, dec1.Decode(&result1)) + assert.Equal(t, "data in file 1", result1) + + dec2, err := ocf.NewDecoder(&buf2, ocf.WithZStandardDecoder(sharedDecoder)) + require.NoError(t, err) + require.True(t, dec2.HasNext()) + var result2 string + require.NoError(t, dec2.Decode(&result2)) + assert.Equal(t, "data in file 2", result2) +} From d7a289b2a7a3e6484081351934ead959a3613eb0 Mon Sep 17 00:00:00 2001 From: aryehlev Date: Tue, 16 Dec 2025 20:51:06 +0200 Subject: [PATCH 2/2] dont close shared encoder. --- ocf/codec.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/ocf/codec.go b/ocf/codec.go index a61f916..8029af3 100644 --- a/ocf/codec.go +++ b/ocf/codec.go @@ -142,29 +142,36 @@ func (*SnappyCodec) Encode(b []byte) []byte { // ZStandardCodec is a zstandard compression codec. type ZStandardCodec struct { - decoder *zstd.Decoder - encoder *zstd.Encoder + decoder *zstd.Decoder + encoder *zstd.Encoder + sharedDecoder bool // true if decoder was provided externally and should not be closed + sharedEncoder bool // true if encoder was provided externally and should not be closed } func newZStandardCodec(opts zstdOptions) *ZStandardCodec { var decoder *zstd.Decoder var encoder *zstd.Encoder + var sharedDecoder, sharedEncoder bool if opts.Decoder != nil { decoder = opts.Decoder + sharedDecoder = true } else { decoder, _ = zstd.NewReader(nil, opts.DOptions...) } if opts.Encoder != nil { encoder = opts.Encoder + sharedEncoder = true } else { encoder, _ = zstd.NewWriter(nil, opts.EOptions...) } return &ZStandardCodec{ - decoder: decoder, - encoder: encoder, + decoder: decoder, + encoder: encoder, + sharedDecoder: sharedDecoder, + sharedEncoder: sharedEncoder, } } @@ -181,11 +188,12 @@ func (zstdCodec *ZStandardCodec) Encode(b []byte) []byte { } // Close closes the zstandard encoder and decoder, releasing resources. +// Shared instances (provided via WithZStandardEncoder/WithZStandardDecoder) are not closed. func (zstdCodec *ZStandardCodec) Close() error { - if zstdCodec.decoder != nil { + if zstdCodec.decoder != nil && !zstdCodec.sharedDecoder { zstdCodec.decoder.Close() } - if zstdCodec.encoder != nil { + if zstdCodec.encoder != nil && !zstdCodec.sharedEncoder { return zstdCodec.encoder.Close() } return nil