From bab7db931b7c442ecfc50c895bf4ee125410a6f6 Mon Sep 17 00:00:00 2001 From: Hyeonjae Date: Tue, 8 Jun 2021 18:37:49 +0900 Subject: [PATCH 1/3] add custom codec --- codec.go | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/codec.go b/codec.go index 51f4fd7e..733ffbe0 100644 --- a/codec.go +++ b/codec.go @@ -113,6 +113,46 @@ func NewCodec(schemaSpecification string) (*Codec, error) { return c, nil } +func NewCustomCodec(schemaSpecification string, customCodec map[string]*Codec) (*Codec, error) { + var schema interface{} + + if err := json.Unmarshal([]byte(schemaSpecification), &schema); err != nil { + return nil, fmt.Errorf("cannot unmarshal schema JSON: %s", err) + } + + // bootstrap a symbol table with primitive type codecs for the new codec + st := newSymbolTable() + codecs := mergeCodec(st, customCodec) + + c, err := buildCodec(codecs, nullNamespace, schema) + if err != nil { + return nil, err + } + c.schemaCanonical, err = parsingCanonicalForm(schema, "", make(map[string]string)) + if err != nil { + return nil, err // should not get here because schema was validated above + } + + c.Rabin = rabin([]byte(c.schemaCanonical)) + c.soeHeader = []byte{0xC3, 0x01, 0, 0, 0, 0, 0, 0, 0, 0} + binary.LittleEndian.PutUint64(c.soeHeader[2:], c.Rabin) + + c.schemaOriginal = schemaSpecification + return c, nil +} + +func mergeCodec(origin, custom map[string]*Codec) map[string]*Codec { + codecs := map[string]*Codec{} + for k, v := range origin { + codecs[k] = v + } + + for k, v := range custom { + codecs[k] = v + } + return codecs +} + func newSymbolTable() map[string]*Codec { return map[string]*Codec{ "boolean": { From 5c528d11afcb54eb4943a8d004ed9dc12db49fbf Mon Sep 17 00:00:00 2001 From: Hyeonjae Date: Wed, 9 Jun 2021 20:43:18 +0900 Subject: [PATCH 2/3] add codec modifier --- codec.go | 46 ++++++---------------------------------------- 1 file changed, 6 insertions(+), 40 deletions(-) diff --git a/codec.go b/codec.go index 733ffbe0..b692bd46 100644 --- a/codec.go +++ b/codec.go @@ -42,6 +42,8 @@ var ( MaxBlockSize = int64(math.MaxInt32) ) +type CodecModifier func(map[string]*Codec) + // Codec supports decoding binary and text Avro data to Go native data types, // and conversely encoding Go native data types to binary or text Avro data. A // Codec is created as a stateless structure that can be safely used in multiple @@ -86,7 +88,7 @@ type Codec struct { // if err != nil { // fmt.Println(err) // } -func NewCodec(schemaSpecification string) (*Codec, error) { +func NewCodec(schemaSpecification string, modifiers ...CodecModifier) (*Codec, error) { var schema interface{} if err := json.Unmarshal([]byte(schemaSpecification), &schema); err != nil { @@ -96,35 +98,11 @@ func NewCodec(schemaSpecification string) (*Codec, error) { // bootstrap a symbol table with primitive type codecs for the new codec st := newSymbolTable() - c, err := buildCodec(st, nullNamespace, schema) - if err != nil { - return nil, err - } - c.schemaCanonical, err = parsingCanonicalForm(schema, "", make(map[string]string)) - if err != nil { - return nil, err // should not get here because schema was validated above + for _, modifier := range modifiers { + modifier(st) } - c.Rabin = rabin([]byte(c.schemaCanonical)) - c.soeHeader = []byte{0xC3, 0x01, 0, 0, 0, 0, 0, 0, 0, 0} - binary.LittleEndian.PutUint64(c.soeHeader[2:], c.Rabin) - - c.schemaOriginal = schemaSpecification - return c, nil -} - -func NewCustomCodec(schemaSpecification string, customCodec map[string]*Codec) (*Codec, error) { - var schema interface{} - - if err := json.Unmarshal([]byte(schemaSpecification), &schema); err != nil { - return nil, fmt.Errorf("cannot unmarshal schema JSON: %s", err) - } - - // bootstrap a symbol table with primitive type codecs for the new codec - st := newSymbolTable() - codecs := mergeCodec(st, customCodec) - - c, err := buildCodec(codecs, nullNamespace, schema) + c, err := buildCodec(st, nullNamespace, schema) if err != nil { return nil, err } @@ -141,18 +119,6 @@ func NewCustomCodec(schemaSpecification string, customCodec map[string]*Codec) ( return c, nil } -func mergeCodec(origin, custom map[string]*Codec) map[string]*Codec { - codecs := map[string]*Codec{} - for k, v := range origin { - codecs[k] = v - } - - for k, v := range custom { - codecs[k] = v - } - return codecs -} - func newSymbolTable() map[string]*Codec { return map[string]*Codec{ "boolean": { From c049f0f08f61b372d63c3dc17f53b7f1a2e94c61 Mon Sep 17 00:00:00 2001 From: Hyeonjae Date: Wed, 9 Jun 2021 21:08:39 +0900 Subject: [PATCH 3/3] add testcase --- record_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/record_test.go b/record_test.go index 21100d61..5d354a69 100644 --- a/record_test.go +++ b/record_test.go @@ -445,6 +445,82 @@ func TestRecordRecursiveRoundTrip(t *testing.T) { } } +func TestUnknownTypeCodecError(t *testing.T) { + _, err := NewCodec(` +{ + "type": "record", + "name": "Parent", + "fields" : [ + {"name": "child", "type": "Child"} + ] +} +`) + ensureError(t, err, "Record \"Parent\" field 1 ought to be valid Avro named type: unknown type name: \"Child\"") +} + +func TestCodecModifier(t *testing.T) { + childCodec, err := NewCodec(` +{ + "type": "record", + "name": "Child", + "fields" : [ + {"name": "age", "type": "int"} + ] +} +`) + ensureError(t, err) + + modifier := func(st map[string]*Codec) { + st["Child"] = childCodec + } + + codec, err := NewCodec(` +{ + "type": "record", + "name": "Parent", + "fields" : [ + {"name": "child", "type": "Child"} + ] +} +`, modifier) + ensureError(t, err) + + child := map[string]interface{} { + "age": 7, + } + parent := map[string]interface{} { + "child": child, + } + + // Convert native Go form to binary Avro data + buf, err := codec.BinaryFromNative(nil, parent) + ensureError(t, err) + + // Convert binary Avro data back to native Go form + datum, _, err := codec.NativeFromBinary(buf) + ensureError(t, err) + + actual, ok := datum.(map[string]interface{}) + if !ok { + t.Fatalf("origin data contaminated") + } + + child, ok = actual["child"].(map[string]interface{}) + if !ok { + t.Fatalf("child type contaminated") + } + + age, ok := child["age"].(int32) + if !ok { + t.Fatalf("child age field contaminated") + } + + if age != 7 { + t.Fatalf("child age data contaminated") + } +} + + func ExampleRecordRecursiveRoundTrip() { codec, err := NewCodec(` {