From 3d3b2d67bb4077e3cefdc43fff15834f8543c763 Mon Sep 17 00:00:00 2001 From: "davyd.eliosidze@zopa.com" Date: Fri, 7 Aug 2020 23:16:17 +0200 Subject: [PATCH] Fix decimal serialization --- client/package-lock.json | 18 ++++++ src/main/java/org/akhq/models/Record.java | 6 +- src/main/java/org/akhq/models/Schema.java | 8 +-- ...lizer.java => AvroSchemaDeserializer.java} | 2 +- ...ializer.java => AvroSchemaSerializer.java} | 2 +- .../org/akhq/utils/AvroToJsonSerializer.java | 63 +++++++++++++++++++ src/test/avro/Dog.avsc | 10 +++ ...est.java => AvroSchemaSerializerTest.java} | 2 +- .../modules/AvroToJsonSerializerTest.java | 54 ++++++++++++++++ .../repositories/RecordRepositoryTest.java | 2 +- 10 files changed, 156 insertions(+), 11 deletions(-) rename src/main/java/org/akhq/utils/{AvroDeserializer.java => AvroSchemaDeserializer.java} (88%) rename src/main/java/org/akhq/utils/{AvroSerializer.java => AvroSchemaSerializer.java} (88%) create mode 100644 src/main/java/org/akhq/utils/AvroToJsonSerializer.java create mode 100644 src/test/avro/Dog.avsc rename src/test/java/org/akhq/modules/{AvroSerializerTest.java => AvroSchemaSerializerTest.java} (98%) create mode 100644 src/test/java/org/akhq/modules/AvroToJsonSerializerTest.java diff --git a/client/package-lock.json b/client/package-lock.json index b14e3103d..eadbed274 100644 --- a/client/package-lock.json +++ b/client/package-lock.json @@ -3469,6 +3469,15 @@ "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.1.0.tgz", "integrity": "sha512-1Yj8h9Q+QDF5FzhMs/c9+6UntbD5MkRfRwac8DoEm9ZfUBZ7tZ55YcGVAzEe4bXsdQHEk+s9S5wsOKVdZrw0tQ==" }, + "bindings": { + "version": "1.5.0", + "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz", + "integrity": "sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==", + "optional": true, + "requires": { + "file-uri-to-path": "1.0.0" + } + }, "block-stream": { "version": "0.0.9", "resolved": "https://registry.npmjs.org/block-stream/-/block-stream-0.0.9.tgz", @@ -6475,6 +6484,12 @@ "schema-utils": "^2.5.0" } }, + "file-uri-to-path": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz", + "integrity": "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==", + "optional": true + }, "filesize": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/filesize/-/filesize-6.0.1.tgz", @@ -8205,6 +8220,7 @@ "integrity": "sha512-oWb1Z6mkHIskLzEJ/XWX0srkpkTQ7vaopMQkyaEIoq0fmtFVxOthb8cCxeT+p3ynTdkk/RZwbgG4brR5BeWECw==", "optional": true, "requires": { + "bindings": "^1.5.0", "nan": "^2.12.1" } } @@ -15263,6 +15279,7 @@ "integrity": "sha512-oWb1Z6mkHIskLzEJ/XWX0srkpkTQ7vaopMQkyaEIoq0fmtFVxOthb8cCxeT+p3ynTdkk/RZwbgG4brR5BeWECw==", "optional": true, "requires": { + "bindings": "^1.5.0", "nan": "^2.12.1" } }, @@ -15578,6 +15595,7 @@ "integrity": "sha512-oWb1Z6mkHIskLzEJ/XWX0srkpkTQ7vaopMQkyaEIoq0fmtFVxOthb8cCxeT+p3ynTdkk/RZwbgG4brR5BeWECw==", "optional": true, "requires": { + "bindings": "^1.5.0", "nan": "^2.12.1" } }, diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index 30d273d1f..8d3a77163 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -3,12 +3,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import lombok.*; +import org.akhq.utils.AvroToJsonSerializer; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.TimestampType; -import org.joda.time.DateTime; import java.nio.ByteBuffer; import java.time.Instant; @@ -107,8 +107,8 @@ private String convertToString(byte[] payload, Integer keySchemaId) { return null; } else if (keySchemaId != null) { try { - GenericRecord deserialize = (GenericRecord) kafkaAvroDeserializer.deserialize(topic, payload); - return deserialize.toString(); + GenericRecord record = (GenericRecord) kafkaAvroDeserializer.deserialize(topic, payload); + return AvroToJsonSerializer.toJson(record); } catch (Exception exception) { return new String(payload); } diff --git a/src/main/java/org/akhq/models/Schema.java b/src/main/java/org/akhq/models/Schema.java index 11024f838..123954875 100644 --- a/src/main/java/org/akhq/models/Schema.java +++ b/src/main/java/org/akhq/models/Schema.java @@ -5,8 +5,8 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.google.common.annotations.VisibleForTesting; import lombok.*; -import org.akhq.utils.AvroDeserializer; -import org.akhq.utils.AvroSerializer; +import org.akhq.utils.AvroSchemaDeserializer; +import org.akhq.utils.AvroSchemaSerializer; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema.Parser; @@ -28,8 +28,8 @@ public class Schema { private Integer version; private Config.CompatibilityLevelConfig compatibilityLevel; - @JsonSerialize(using = AvroSerializer.class) - @JsonDeserialize(using = AvroDeserializer.class) + @JsonSerialize(using = AvroSchemaSerializer.class) + @JsonDeserialize(using = AvroSchemaDeserializer.class) private org.apache.avro.Schema schema; private String exception; diff --git a/src/main/java/org/akhq/utils/AvroDeserializer.java b/src/main/java/org/akhq/utils/AvroSchemaDeserializer.java similarity index 88% rename from src/main/java/org/akhq/utils/AvroDeserializer.java rename to src/main/java/org/akhq/utils/AvroSchemaDeserializer.java index a9624f7d6..7159125aa 100644 --- a/src/main/java/org/akhq/utils/AvroDeserializer.java +++ b/src/main/java/org/akhq/utils/AvroSchemaDeserializer.java @@ -8,7 +8,7 @@ import java.io.IOException; -public class AvroDeserializer extends JsonDeserializer { +public class AvroSchemaDeserializer extends JsonDeserializer { @Override public Schema deserialize( JsonParser p, diff --git a/src/main/java/org/akhq/utils/AvroSerializer.java b/src/main/java/org/akhq/utils/AvroSchemaSerializer.java similarity index 88% rename from src/main/java/org/akhq/utils/AvroSerializer.java rename to src/main/java/org/akhq/utils/AvroSchemaSerializer.java index 432fa42f8..c732e4c6c 100644 --- a/src/main/java/org/akhq/utils/AvroSerializer.java +++ b/src/main/java/org/akhq/utils/AvroSchemaSerializer.java @@ -8,7 +8,7 @@ import java.io.IOException; -public class AvroSerializer extends JsonSerializer { +public class AvroSchemaSerializer extends JsonSerializer { @Override public void serialize( Schema value, diff --git a/src/main/java/org/akhq/utils/AvroToJsonSerializer.java b/src/main/java/org/akhq/utils/AvroToJsonSerializer.java new file mode 100644 index 000000000..87fd11b1b --- /dev/null +++ b/src/main/java/org/akhq/utils/AvroToJsonSerializer.java @@ -0,0 +1,63 @@ +package org.akhq.utils; + +import org.apache.avro.*; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.io.JsonEncoder; +import org.apache.avro.specific.SpecificDatumWriter; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +public class AvroToJsonSerializer { + + public static String toJson(GenericRecord record) throws IOException { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(record.getSchema(), outputStream); + new BigDecimalFriendlySpecificDatumWriter(record.getSchema()).write(record, jsonEncoder); + jsonEncoder.flush(); + return new String(outputStream.toByteArray()); + } + } +} + +class BigDecimalFriendlySpecificDatumWriter extends SpecificDatumWriter { + + private static final Conversion DECIMAL_CONVERSION = new Conversions.DecimalConversion(); + + public BigDecimalFriendlySpecificDatumWriter(Schema schema) { + super(schema); + } + + @Override + protected void writeField(Object datum, Schema.Field f, Encoder out, Object state) throws IOException { + if (datum instanceof GenericData.Record) { + Schema fieldSchema = f.schema(); + LogicalType logicalType = fieldSchema.getLogicalType(); + Object value = getData().getField(datum, f.name(), f.pos()); + if (logicalType instanceof LogicalTypes.Decimal) { + value = convert(DECIMAL_CONVERSION, fieldSchema, logicalType, value); + } + writeWithoutConversion(fieldSchema, value, out); + } else { + super.writeField(datum, f, out, state); + } + } + + private Object convert(Conversion conversion, Schema fieldSchema, LogicalType logicalType, Object value) { + if (conversion instanceof Conversions.DecimalConversion && value instanceof ByteBuffer) { + // convert decimal value to a string + byte[] byteValue = new byte[((ByteBuffer) value).remaining()]; + ((ByteBuffer) value).get(byteValue); + BigDecimal number = (BigDecimal) conversion.fromBytes(ByteBuffer.wrap(byteValue), fieldSchema, logicalType); + return (ByteBuffer.wrap(number.toPlainString().getBytes())); + } else { + return convert(fieldSchema, logicalType, conversion, value); + } + } +} + diff --git a/src/test/avro/Dog.avsc b/src/test/avro/Dog.avsc new file mode 100644 index 000000000..4eedded65 --- /dev/null +++ b/src/test/avro/Dog.avsc @@ -0,0 +1,10 @@ +{ + "name": "Dog", + "namespace": "org.akhq", + "type": "record", + "fields" : [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"}, + {"name": "weight", "type" : { "type" : "bytes", "logicalType" : "decimal", "precision" : 4, "scale" : 2}} + ] +} diff --git a/src/test/java/org/akhq/modules/AvroSerializerTest.java b/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java similarity index 98% rename from src/test/java/org/akhq/modules/AvroSerializerTest.java rename to src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java index 28348c112..e0d0a7950 100644 --- a/src/test/java/org/akhq/modules/AvroSerializerTest.java +++ b/src/test/java/org/akhq/modules/AvroSchemaSerializerTest.java @@ -19,7 +19,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -class AvroSerializerTest { +class AvroSchemaSerializerTest { private final org.apache.avro.Schema SCHEMA = SchemaBuilder .record("schema1").namespace("org.akhq") diff --git a/src/test/java/org/akhq/modules/AvroToJsonSerializerTest.java b/src/test/java/org/akhq/modules/AvroToJsonSerializerTest.java new file mode 100644 index 000000000..c7050129c --- /dev/null +++ b/src/test/java/org/akhq/modules/AvroToJsonSerializerTest.java @@ -0,0 +1,54 @@ +package org.akhq.modules; + +import org.akhq.Breed; +import org.akhq.Cat; +import org.akhq.Dog; +import org.akhq.utils.AvroToJsonSerializer; +import org.apache.avro.Conversion; +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigDecimal; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class AvroToJsonSerializerTest { + + private static final Conversion DECIMAL_CONVERSION = new Conversions.DecimalConversion(); + + @Test + public void serializeAvroToJsonWithDecimal() throws IOException { + String expectedString = "{\"id\":10,\"name\":\"Tiger\",\"weight\":\"10.40\"}"; + GenericRecord dogExample = aDogExample(10, "Tiger", 10.40); + assertEquals(AvroToJsonSerializer.toJson(dogExample), expectedString); + } + + @Test + public void serializeAvroToJsonWithoutDecimal() throws IOException { + String expectedString = "{\"id\":10,\"name\":\"Tom\",\"breed\":\"SPHYNX\"}"; + GenericRecord catExample = aCatExample(10, "Tom", Breed.SPHYNX); + assertEquals(AvroToJsonSerializer.toJson(catExample), expectedString); + } + + private GenericRecord aCatExample(int id, String name, Breed breed) { + return new GenericRecordBuilder(Cat.SCHEMA$) + .set("id", id) + .set("name", name) + .set("breed", breed) + .build(); + } + + private GenericRecord aDogExample(int id, String name, double weight) { + Schema.Field weightField = Dog.SCHEMA$.getField("weight"); + return new GenericRecordBuilder(Dog.SCHEMA$) + .set("id", id) + .set("name", name) + .set("weight", DECIMAL_CONVERSION.toBytes(BigDecimal.valueOf(weight).setScale(2), weightField.schema(), weightField.schema().getLogicalType())) + .build(); + } + +} diff --git a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java index a80379cb5..d4a81db94 100644 --- a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java @@ -122,7 +122,7 @@ public void consumeAvro() throws ExecutionException, InterruptedException { .findFirst(); avroRecord.orElseThrow(() -> new NoSuchElementException("Unable to find key 1")); - avroRecord.ifPresent(record -> assertEquals("{\"id\": 1, \"name\": \"WaWa\", \"breed\": \"ABYSSINIAN\"}", record.getValue())); + avroRecord.ifPresent(record -> assertEquals("{\"id\":1,\"name\":\"WaWa\",\"breed\":\"ABYSSINIAN\"}", record.getValue())); } private List consumeAllRecord(RecordRepository.Options options) throws ExecutionException, InterruptedException {