diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java index 1f5b4a2d4e..fca5c66849 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java @@ -84,6 +84,7 @@ public abstract class AvroModelHandler private final Int2ObjectCache> writers; private final Int2ObjectCache records; private final Int2IntHashMap paddings; + private final Int2IntHashMap avroOverheads; private final AvroBytesFW bytesRO; private final AvroIntFW intRO; private final AvroLongFW longRO; @@ -115,6 +116,7 @@ protected AvroModelHandler( this.writers = new Int2ObjectCache<>(1, 1024, i -> {}); this.records = new Int2ObjectCache<>(1, 1024, i -> {}); this.paddings = new Int2IntHashMap(-1); + this.avroOverheads = new Int2IntHashMap(-1); this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer()); this.in = new DirectBufferInputStream(); this.event = new AvroModelEventContext(context); @@ -195,6 +197,12 @@ protected final int supplyPadding( return paddings.computeIfAbsent(schemaId, id -> calculatePadding(supplySchema(id))); } + protected final int supplyAvroOverhead( + int schemaId) + { + return avroOverheads.computeIfAbsent(schemaId, id -> calculateAvroOverhead(supplySchema(id))); + } + protected final GenericDatumReader supplyReader( int schemaId) { @@ -311,6 +319,30 @@ private int calculatePadding( return padding; } + private int calculateAvroOverhead(Schema schema) + { + int overhead = 0; + if (schema != null) + { + switch (schema.getType()) + { + case RECORD: + for (Schema.Field field : schema.getFields()) + { + overhead += calculateAvroOverhead(field.schema()); + } + break; + case UNION: + List types = schema.getTypes(); + for (Schema type : types) + { + overhead += (type.getType().equals(Schema.Type.NULL)) ? calculateAvroOverhead(type) : 1; + } + } + } + return overhead; + } + private void extract( Schema schema, DirectBuffer data, diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java index 6b8e3e0ac6..f06d2bfa0d 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java @@ -46,7 +46,16 @@ public int padding( int index, int length) { - return handler.encodePadding(length); + int padding = handler.encodePadding(length); + int schemaId = catalog != null && catalog.id > 0 + ? catalog.id + : handler.resolve(subject, catalog.version); + + if (VIEW_JSON.equals(view)) + { + padding += supplyAvroOverhead(schemaId); + } + return padding; } @Override diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java index 3656e1d44d..a6aec3bc5e 100644 --- a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java +++ b/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java @@ -103,7 +103,9 @@ public int readIndex() throws IOException String label; final JsonToken currentToken = lin.getCurrentToken(); - if (currentToken == JsonToken.VALUE_NULL) + if (currentToken == JsonToken.VALUE_NULL || + currentToken == JsonToken.END_OBJECT || + currentToken == JsonToken.FIELD_NAME) { label = "null"; } @@ -140,6 +142,14 @@ else if (currentToken == JsonToken.START_OBJECT && } catch (InvocationTargetException ex) { + if (ex.getTargetException() instanceof IOException) + { + throw (IOException) ex.getTargetException(); + } + else if (ex.getTargetException() instanceof RuntimeException) + { + throw (RuntimeException) ex.getTargetException(); + } throw new RuntimeException(ex); } } diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java index 9de5203574..26aeb6cf8a 100644 --- a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java +++ b/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java @@ -159,7 +159,7 @@ public void readNull() throws IOException { in.nextToken(); } - else + else if (in.getCurrentToken() != JsonToken.END_OBJECT) { throw error("null"); } diff --git a/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java b/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java index e7cfb52f07..cb43a00898 100644 --- a/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java +++ b/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java @@ -352,6 +352,38 @@ public void shouldVerifyPaddingLength() assertEquals(525, converter.padding(data, 0, data.capacity())); } + @Test + public void shouldVerifyWritePaddingLengthWithNull() + { + TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new) + .namespace("test") + .name("test0") + .type("test") + .options(TestCatalogOptionsConfig::builder) + .id(9) + .schema(SCHEMA_WITH_NULL) + .build() + .build(); + AvroModelConfig model = AvroModelConfig.builder() + .view("json") + .catalog() + .name("test0") + .schema() + .strategy("topic") + .version("latest") + .subject("test-value") + .build() + .build() + .build(); + + when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options)); + AvroWriteConverterHandler converter = new AvroWriteConverterHandler(config, model, context); + + DirectBuffer data = new UnsafeBuffer(); + + assertEquals(1, converter.padding(data, 0, data.capacity())); + } + @Test public void shouldExtract() {