From 404edd082e94c434debfd166305cb7dd0886bcc9 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Thu, 13 Nov 2025 13:39:53 +0400 Subject: [PATCH 1/3] Added CradleMessage.getContentBuffer method --- .../utils/GroupedMessageEntityUtils.java | 12 ++-- .../utils/MessageBatchEntityUtils.java | 12 ++-- .../messages/MessageTestUtils.java | 8 +-- .../cradle/messages/CradleMessage.java | 5 +- .../cradle/messages/MessageToStore.java | 9 ++- .../cradle/messages/StoredMessage.java | 55 +++++++++++++------ .../cradle/messages/StoredMessageBuilder.java | 7 ++- .../serialization/MessageDeserializer.java | 18 ++---- .../exactpro/cradle/utils/MessageUtils.java | 40 +------------- .../GroupedMessageBatchToStoreTest.java | 7 ++- .../messages/MessageBatchToStoreTest.java | 7 ++- .../cradle/utils/MessageUtilsTest.java | 5 +- .../utils/SerializationMessageTest.java | 26 ++++----- gradle.properties | 2 +- 14 files changed, 104 insertions(+), 109 deletions(-) diff --git a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/utils/GroupedMessageEntityUtils.java b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/utils/GroupedMessageEntityUtils.java index 561b4806f..2c125bbb1 100644 --- a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/utils/GroupedMessageEntityUtils.java +++ b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/utils/GroupedMessageEntityUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -89,21 +89,21 @@ public static SerializedEntity storedMessages = MessageUtils.deserializeMessages(content, pageId.getBookId()); return new StoredGroupedMessageBatch(entity.getGroup(), storedMessages, pageId, entity.getRecDate()); } - private static byte[] restoreContent(GroupedMessageBatchEntity entity, String group) throws CompressException { + private static ByteBuffer restoreContent(GroupedMessageBatchEntity entity, String group) throws CompressException { ByteBuffer content = entity.getContent(); if (content == null) return null; - byte[] result = content.array(); if (entity.isCompressed()) { logger.trace("Decompressing content of grouped message batch '{}'", group); - return CompressionType.decompressData(result); + ByteBuffer result = ByteBuffer.allocate(entity.getUncompressedContentSize()); + return CompressionType.decompressData(content, result); } - return result; + return content; } } diff --git a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/utils/MessageBatchEntityUtils.java b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/utils/MessageBatchEntityUtils.java index 23c04b729..7561f2fdb 100644 --- a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/utils/MessageBatchEntityUtils.java +++ b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/utils/MessageBatchEntityUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,7 +91,7 @@ public static StoredMessageBatch toStoredMessageBatch(MessageBatchEntity entity, StoredMessageId batchId = createId(entity, pageId.getBookId()); logger.debug("Creating message batch '{}' from entity", batchId); - byte[] content = restoreContent(entity, batchId); + ByteBuffer content = restoreContent(entity, batchId); List storedMessages = MessageUtils.deserializeMessages(content, batchId); return new StoredMessageBatch(storedMessages, pageId, entity.getRecDate()); } @@ -106,16 +106,16 @@ private static StoredMessageId createId(MessageBatchEntity entity, BookId bookId } - private static byte[] restoreContent(MessageBatchEntity entity, StoredMessageId messageBatchId) throws CompressException { + private static ByteBuffer restoreContent(MessageBatchEntity entity, StoredMessageId messageBatchId) throws CompressException { ByteBuffer content = entity.getContent(); if (content == null) return null; - byte[] result = content.array(); if (entity.isCompressed()) { logger.trace("Decompressing content of message batch '{}'", messageBatchId); - return CompressionType.decompressData(result); + ByteBuffer result = ByteBuffer.allocate(entity.getUncompressedContentSize()); + return CompressionType.decompressData(content, result); } - return result; + return content; } } diff --git a/cradle-cassandra/src/test/java/com/exactpro/cradle/cassandra/integration/messages/MessageTestUtils.java b/cradle-cassandra/src/test/java/com/exactpro/cradle/cassandra/integration/messages/MessageTestUtils.java index 7cfee50ba..070cd8fce 100644 --- a/cradle-cassandra/src/test/java/com/exactpro/cradle/cassandra/integration/messages/MessageTestUtils.java +++ b/cradle-cassandra/src/test/java/com/exactpro/cradle/cassandra/integration/messages/MessageTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ public static StoredGroupedMessageBatch groupedMessageBatchToStored (PageId page .setDirection(el.getDirection()) .setTimestamp(el.getTimestamp()) .setIndex(el.getSequence()) - .setContent(el.getContent()) + .setContent(el.getContentBuffer()) .setProtocol(el.getProtocol()) .build()).collect(Collectors.toList()), pageId, @@ -49,7 +49,7 @@ public static StoredMessageBatch messageBatchToStored (PageId pageId, Instant re .setDirection(el.getDirection()) .setTimestamp(el.getTimestamp()) .setIndex(el.getSequence()) - .setContent(el.getContent()) + .setContent(el.getContentBuffer()) .setProtocol(el.getProtocol()) .build()).collect(Collectors.toList()), pageId, @@ -63,7 +63,7 @@ public static StoredMessage messageToStored(StoredMessage storedMessage, PageId .setDirection(storedMessage.getDirection()) .setTimestamp(storedMessage.getTimestamp()) .setIndex(storedMessage.getSequence()) - .setContent(storedMessage.getContent()) + .setContent(storedMessage.getContentBuffer()) .setProtocol(storedMessage.getProtocol()) .setPageId(pageId) .build(); diff --git a/cradle-core/src/main/java/com/exactpro/cradle/messages/CradleMessage.java b/cradle-core/src/main/java/com/exactpro/cradle/messages/CradleMessage.java index 6c2ebbb78..6bf29244f 100644 --- a/cradle-core/src/main/java/com/exactpro/cradle/messages/CradleMessage.java +++ b/cradle-core/src/main/java/com/exactpro/cradle/messages/CradleMessage.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import com.exactpro.cradle.Direction; import com.exactpro.cradle.serialization.MessagesSizeCalculator; +import java.nio.ByteBuffer; import java.time.Instant; public interface CradleMessage { @@ -31,8 +32,10 @@ public interface CradleMessage { /** * @return message content */ + @Deprecated(since = "5.8.0") byte[] getContent(); + ByteBuffer getContentBuffer(); /** * @return grpc protocol */ diff --git a/cradle-core/src/main/java/com/exactpro/cradle/messages/MessageToStore.java b/cradle-core/src/main/java/com/exactpro/cradle/messages/MessageToStore.java index 429920e46..8f6f8dc81 100644 --- a/cradle-core/src/main/java/com/exactpro/cradle/messages/MessageToStore.java +++ b/cradle-core/src/main/java/com/exactpro/cradle/messages/MessageToStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import com.exactpro.cradle.utils.CradleStorageException; import com.exactpro.cradle.utils.MessageUtils; +import java.nio.ByteBuffer; import java.util.Arrays; /** @@ -74,6 +75,12 @@ public byte[] getContent() { return content; } + @Override + public ByteBuffer getContentBuffer() { + if (content == null) { return null; } + return ByteBuffer.wrap(content); + } + @Override public String getProtocol() { return protocol; diff --git a/cradle-core/src/main/java/com/exactpro/cradle/messages/StoredMessage.java b/cradle-core/src/main/java/com/exactpro/cradle/messages/StoredMessage.java index 00ecbf902..dfb367088 100644 --- a/cradle-core/src/main/java/com/exactpro/cradle/messages/StoredMessage.java +++ b/cradle-core/src/main/java/com/exactpro/cradle/messages/StoredMessage.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,9 +22,11 @@ import com.exactpro.cradle.serialization.MessagesSizeCalculator; import java.io.Serializable; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; /** * Holds information about one message stored in Cradle. @@ -33,14 +35,16 @@ public class StoredMessage implements Serializable, CradleMessage { private static final long serialVersionUID = 5602557739148866986L; private final StoredMessageId id; - private final byte[] content; + private final ByteBuffer contentBuffer; private final StoredMessageMetadata metadata; private final PageId pageId; private final String protocol; private final int serializedSize; + private final AtomicReference content = new AtomicReference<>(); + public StoredMessage(CradleMessage message, StoredMessageId id, PageId pageId) { - this(id, message.getProtocol(), message.getContent(), message.getMetadata() != null + this(id, message.getProtocol(), message.getContentBuffer(), message.getMetadata() != null ? new StoredMessageMetadata(message.getMetadata()) : null, pageId); } @@ -48,10 +52,10 @@ public StoredMessage(StoredMessage copyFrom) { this(copyFrom, copyFrom.getId(), copyFrom.getPageId()); } - protected StoredMessage(StoredMessageId id, String protocol, byte[] content, StoredMessageMetadata metadata, PageId pageId) { + protected StoredMessage(StoredMessageId id, String protocol, ByteBuffer content, StoredMessageMetadata metadata, PageId pageId) { this.id = id; this.protocol = protocol; - this.content = content; + this.contentBuffer = content; this.metadata = metadata; this.pageId = pageId; this.serializedSize = MessagesSizeCalculator.calculateMessageSizeInBatch(this); @@ -92,7 +96,21 @@ public long getSequence() { @Override public byte[] getContent() { - return content; + if (contentBuffer == null) { return null; } + return content.accumulateAndGet(null, (curr, x) -> { + if (curr == null) { + ByteBuffer buffer = getContentBuffer(); + byte[] result = new byte[buffer.remaining()]; + buffer.get(result); + return result; + } + return curr; + }); + } + + @Override + public ByteBuffer getContentBuffer() { + return contentBuffer == null ? null : contentBuffer.asReadOnlyBuffer(); } @Override @@ -116,7 +134,13 @@ public int getSerializedSize() { @Override public int hashCode() { - return Objects.hash(id, metadata, pageId, protocol, Arrays.hashCode(content)); + int result = Objects.hashCode(getId()); + result = 31 * result + Objects.hashCode(id); + result = 31 * result + Objects.hashCode(pageId); + result = 31 * result + Objects.hashCode(protocol); + result = 31 * result + Objects.hashCode(metadata); + result = 31 * result + Objects.hashCode(contentBuffer); + return result; } @Override @@ -132,18 +156,17 @@ public boolean equals(Object obj) { Objects.equals(pageId, other.pageId) && Objects.equals(protocol, other.protocol) && Objects.equals(metadata, other.metadata) && - Arrays.equals(content, other.content); + Objects.equals(contentBuffer, other.contentBuffer); } @Override public String toString() { - return new StringBuilder() - .append("StoredMessage{").append(System.lineSeparator()) - .append("id=").append(id).append(",").append(System.lineSeparator()) - .append("content=").append(Arrays.toString(getContent())).append(System.lineSeparator()) - .append("metadata=").append(getMetadata()).append(",").append(System.lineSeparator()) - .append("protocol=").append(getProtocol()).append(",").append(System.lineSeparator()) - .append("pageId=").append(getPageId()).append(System.lineSeparator()) - .append("}").toString(); + return "StoredMessage{" + System.lineSeparator() + + "id=" + id + "," + System.lineSeparator() + + "content=" + Arrays.toString(getContent()) + System.lineSeparator() + + "metadata=" + getMetadata() + "," + System.lineSeparator() + + "protocol=" + getProtocol() + "," + System.lineSeparator() + + "pageId=" + getPageId() + System.lineSeparator() + + "}"; } } diff --git a/cradle-core/src/main/java/com/exactpro/cradle/messages/StoredMessageBuilder.java b/cradle-core/src/main/java/com/exactpro/cradle/messages/StoredMessageBuilder.java index 908290f2f..0db2a981a 100644 --- a/cradle-core/src/main/java/com/exactpro/cradle/messages/StoredMessageBuilder.java +++ b/cradle-core/src/main/java/com/exactpro/cradle/messages/StoredMessageBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import com.exactpro.cradle.Direction; import com.exactpro.cradle.PageId; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -43,7 +44,7 @@ public class StoredMessageBuilder { protected Map metadata; - private byte[] content; + private ByteBuffer content; public StoredMessageBuilder setSessionAlias(String sessionAlias) { this.sessionAlias = sessionAlias; @@ -78,7 +79,7 @@ public StoredMessageBuilder setTimestamp(Instant timestamp) { return this; } - public StoredMessageBuilder setContent(byte[] content) { + public StoredMessageBuilder setContent(ByteBuffer content) { this.content = content; return this; } diff --git a/cradle-core/src/main/java/com/exactpro/cradle/serialization/MessageDeserializer.java b/cradle-core/src/main/java/com/exactpro/cradle/serialization/MessageDeserializer.java index 3aed4293f..a5b3ea327 100644 --- a/cradle-core/src/main/java/com/exactpro/cradle/serialization/MessageDeserializer.java +++ b/cradle-core/src/main/java/com/exactpro/cradle/serialization/MessageDeserializer.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,16 +28,16 @@ import static com.exactpro.cradle.serialization.Serialization.MessageBatchConst.*; import static com.exactpro.cradle.serialization.Serialization.NOT_SUPPORTED_PROTOCOL_FORMAT; +import static com.exactpro.cradle.serialization.SerializationUtils.readBufferedBody; import static com.exactpro.cradle.serialization.SerializationUtils.readInstant; import static com.exactpro.cradle.serialization.SerializationUtils.readString; -import static com.exactpro.cradle.serialization.SerializationUtils.readBody; public class MessageDeserializer { - public boolean checkMessageBatchHeader(byte[] array) { + public boolean checkMessageBatchHeader(byte[] array) { return ByteBuffer.wrap(array, 0, 4).getInt() == MESSAGE_BATCH_MAGIC; } - + public List deserializeBatch(byte[] buffer, MessageCommonParams commonParams) throws SerializationException { return this.deserializeBatch(ByteBuffer.wrap(buffer), commonParams); } @@ -102,17 +102,9 @@ public StoredMessage deserialize(ByteBuffer buffer, MessageCommonParams commonPa builder.setMessageId(readMessageId(buffer, commonParams)); builder.setProtocol(readString(buffer)); readMessageMetaData(buffer, builder); - builder.setContent(readBody(buffer)); + builder.setContent(readBufferedBody(buffer)); return builder.build(); } - - private Direction getDirection(int ordinal) throws SerializationException { - Direction[] values = Direction.values(); - if (values.length > ordinal && ordinal >= 0) - return values[ordinal]; - throw new SerializationException(String.format("Invalid ordinal for enum (Direction): %d. Values: [0-%d]", - ordinal, values.length - 1)); - } private StoredMessageId readMessageId(ByteBuffer buffer, MessageCommonParams commonParams) throws SerializationException { String sessionAlias = readString(buffer); diff --git a/cradle-core/src/main/java/com/exactpro/cradle/utils/MessageUtils.java b/cradle-core/src/main/java/com/exactpro/cradle/utils/MessageUtils.java index ce537fb9a..09f6d23da 100644 --- a/cradle-core/src/main/java/com/exactpro/cradle/utils/MessageUtils.java +++ b/cradle-core/src/main/java/com/exactpro/cradle/utils/MessageUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -111,7 +111,7 @@ public static StoredMessage deserializeOneMessage(byte[] contentBytes, StoredMes * @return collection of deserialized messages * @throws IOException if deserialization failed */ - public static List deserializeMessages(byte[] contentBytes, BookId bookId) throws IOException + public static List deserializeMessages(ByteBuffer contentBytes, BookId bookId) throws IOException { return deserializer.deserializeBatch(contentBytes, new MessageCommonParams(bookId)); } @@ -123,42 +123,8 @@ public static List deserializeMessages(byte[] contentBytes, BookI * @return collection of deserialized messages * @throws IOException if deserialization failed */ - public static List deserializeMessages(byte[] contentBytes, StoredMessageId batchId) throws IOException + public static List deserializeMessages(ByteBuffer contentBytes, StoredMessageId batchId) throws IOException { return deserializer.deserializeBatch(contentBytes, new MessageCommonParams(batchId)); } - - /** - * Decompresses given ByteBuffer and deserializes messages till message with needed ID is found - * @param content to deserialize needed message from - * @param compressed flag that indicates if content needs to be decompressed first - * @param id of message to find - * @return deserialized message, if found, null otherwise - * @throws IOException if deserialization failed - */ - public static StoredMessage bytesToOneMessage(ByteBuffer content, boolean compressed, StoredMessageId id) throws IOException, CompressException { - byte[] contentBytes = getMessageContentBytes(content, compressed); - return deserializeOneMessage(contentBytes, id); - } - - /** - * Decompresses given ByteBuffer and deserializes all messages - * @param content to deserialize messages from - * @param id batch id. Required to specify common message params like bookId, sessionAlias, direction - * @param compressed flag that indicates if content needs to be decompressed first - * @return collection of deserialized messages - * @throws IOException if deserialization failed - */ - public static List bytesToMessages(ByteBuffer content, StoredMessageId id, boolean compressed) throws IOException, CompressException { - byte[] contentBytes = getMessageContentBytes(content, compressed); - return deserializeMessages(contentBytes, id); - } - - private static byte[] getMessageContentBytes(ByteBuffer content, boolean compressed) throws CompressException { - byte[] contentBytes = content.array(); - if (!compressed) - return contentBytes; - - return CompressionType.decompressData(contentBytes); - } } diff --git a/cradle-core/src/test/java/com/exactpro/cradle/messages/GroupedMessageBatchToStoreTest.java b/cradle-core/src/test/java/com/exactpro/cradle/messages/GroupedMessageBatchToStoreTest.java index 11d0f4add..9e2930162 100644 --- a/cradle-core/src/test/java/com/exactpro/cradle/messages/GroupedMessageBatchToStoreTest.java +++ b/cradle-core/src/test/java/com/exactpro/cradle/messages/GroupedMessageBatchToStoreTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import org.testng.annotations.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -358,12 +359,12 @@ public void batchSerialization() throws CradleStorageException, IOException { StoredMessage storedMsg = batch.getFirstMessage(); byte[] bytes = MessageUtils.serializeMessages(batch).getSerializedData(); - StoredMessage msg = MessageUtils.deserializeMessages(bytes, book).iterator().next(); + StoredMessage msg = MessageUtils.deserializeMessages(ByteBuffer.wrap(bytes), book).iterator().next(); assertEquals(msg, storedMsg, "Message should be completely serialized/deserialized"); } - static class IdData { + public static class IdData { final BookId book; final String sessionAlias; final Direction direction; diff --git a/cradle-core/src/test/java/com/exactpro/cradle/messages/MessageBatchToStoreTest.java b/cradle-core/src/test/java/com/exactpro/cradle/messages/MessageBatchToStoreTest.java index 3870e3ae6..aad197338 100644 --- a/cradle-core/src/test/java/com/exactpro/cradle/messages/MessageBatchToStoreTest.java +++ b/cradle-core/src/test/java/com/exactpro/cradle/messages/MessageBatchToStoreTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import org.testng.annotations.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -354,12 +355,12 @@ public void batchSerialization() throws CradleStorageException, IOException { .build(), MAX_SIZE, storeActionRejectionThreshold); StoredMessage storedMsg = batch.getFirstMessage(); byte[] bytes = MessageUtils.serializeMessages(batch).getSerializedData(); - StoredMessage msg = MessageUtils.deserializeMessages(bytes, batch.id).iterator().next(); + StoredMessage msg = MessageUtils.deserializeMessages(ByteBuffer.wrap(bytes), batch.id).iterator().next(); Assert.assertEquals(msg, storedMsg, "Message should be completely serialized/deserialized"); } - static class IdData { + public static class IdData { final BookId book; final String sessionAlias; final Direction direction; diff --git a/cradle-core/src/test/java/com/exactpro/cradle/utils/MessageUtilsTest.java b/cradle-core/src/test/java/com/exactpro/cradle/utils/MessageUtilsTest.java index 76f197cfd..fdae5d7bc 100644 --- a/cradle-core/src/test/java/com/exactpro/cradle/utils/MessageUtilsTest.java +++ b/cradle-core/src/test/java/com/exactpro/cradle/utils/MessageUtilsTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import org.testng.annotations.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.Collection; import java.util.Iterator; @@ -70,7 +71,7 @@ public void prepare() throws CradleStorageException { public void messageIds() throws IOException { byte[] bytes = MessageUtils.serializeMessages(batch).getSerializedData(); - Collection restored = MessageUtils.deserializeMessages(bytes, batch.getId()); + Collection restored = MessageUtils.deserializeMessages(ByteBuffer.wrap(bytes), batch.getId()); Iterator it = restored.iterator(); Assert.assertEquals(it.next().getId(), msg1.getId(), "1st message ID"); diff --git a/cradle-core/src/test/java/com/exactpro/cradle/utils/SerializationMessageTest.java b/cradle-core/src/test/java/com/exactpro/cradle/utils/SerializationMessageTest.java index c4f5f065d..29f9c009d 100644 --- a/cradle-core/src/test/java/com/exactpro/cradle/utils/SerializationMessageTest.java +++ b/cradle-core/src/test/java/com/exactpro/cradle/utils/SerializationMessageTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ public void serializeDeserialize() throws SerializationException { builder.putMetadata("key1", "value1"); builder.putMetadata("key2", "value2"); builder.putMetadata("key3", "value3"); - builder.setContent("Message".repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap("Message".repeat(10).getBytes(StandardCharsets.UTF_8))); builder.setBookId(commonParams.getBookId()); StoredMessage build = builder.build(); MessageSerializer serializer = new MessageSerializer(); @@ -77,7 +77,7 @@ public void serializeDeserialize2() throws SerializationException { builder.setIndex(123456789010111213L); builder.setDirection(Direction.SECOND); builder.setTimestamp(Instant.parse("2007-12-03T10:15:30.00Z").plusNanos(51234)); - builder.setContent("Message".repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap("Message".repeat(10).getBytes(StandardCharsets.UTF_8))); builder.setBookId(commonParams.getBookId()); StoredMessage build = builder.build(); MessageSerializer serializer = new MessageSerializer(); @@ -102,7 +102,7 @@ public void serializeDeserialize5UnicodeCharacters() throws SerializationExcepti builder.putMetadata(metaDataKey, metaDataValue); builder.putMetadata(metaDataKey, metaDataValue); String messageContent = generateUnicodeString((1 << 17), 150); - builder.setContent(messageContent.repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap(messageContent.repeat(10).getBytes(StandardCharsets.UTF_8))); StoredMessage build = builder.build(); MessageSerializer serializer = new MessageSerializer(); byte[] serialized = serializer.serialize(build); @@ -123,7 +123,7 @@ public void serializeDeserializeEmptyBody() throws SerializationException { builder.setIndex(123456789010111213L); builder.setDirection(commonParams.getDirection()); builder.setTimestamp(Instant.parse("2007-12-03T10:15:30.00Z")); - builder.setContent(new byte[0]); + builder.setContent(ByteBuffer.allocate(0)); builder.setBookId(commonParams.getBookId()); StoredMessage build = builder.build(); MessageSerializer serializer = new MessageSerializer(); @@ -135,14 +135,14 @@ public void serializeDeserializeEmptyBody() throws SerializationException { } @Test - public void checkMessageLength() throws SerializationException { + public void checkMessageLength() { StoredMessageBuilder builder = new StoredMessageBuilder(); builder.setSessionAlias("stream_name12345"); builder.setIndex(123456789010111213L); builder.setDirection(Direction.SECOND); builder.setBookId(new BookId("book_name1234")); builder.setTimestamp(Instant.parse("2007-12-03T10:15:30.00Z").plusNanos(51234)); - builder.setContent("Message".repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap("Message".repeat(10).getBytes(StandardCharsets.UTF_8))); StoredMessage build = builder.build(); MessageSerializer serializer = new MessageSerializer(); @@ -174,29 +174,29 @@ static List getBatch(MessageCommonParams params) { builder.putMetadata("key1", "value1"); builder.putMetadata("key2", "value2"); builder.putMetadata("key3", "value3"); - builder.setContent("Message".repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap("Message".repeat(10).getBytes(StandardCharsets.UTF_8))); List stMessage = new ArrayList<>(10); stMessage.add(builder.build()); builder.setIndex(123456789010111214L); builder.setTimestamp(Instant.parse("2007-12-03T10:15:30.01Z")); - builder.setContent("Message".repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap("Message".repeat(10).getBytes(StandardCharsets.UTF_8))); stMessage.add(builder.build()); builder.setIndex(123456789010111215L); builder.setTimestamp(Instant.parse("2007-12-03T10:15:30.01Z")); - builder.setContent("Message".repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap("Message".repeat(10).getBytes(StandardCharsets.UTF_8))); stMessage.add(builder.build()); builder.setIndex(123456789010111216L); builder.setTimestamp(Instant.parse("2007-12-03T10:15:30.02Z")); - builder.setContent("Message2".repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap("Message2".repeat(10).getBytes(StandardCharsets.UTF_8))); stMessage.add(builder.build()); builder.setIndex(123456789010111217L); builder.setTimestamp(Instant.parse("2007-12-03T10:15:30.03Z")); - builder.setContent("Message3".repeat(10).getBytes(StandardCharsets.UTF_8)); + builder.setContent(ByteBuffer.wrap("Message3".repeat(10).getBytes(StandardCharsets.UTF_8))); stMessage.add(builder.build()); return stMessage; @@ -225,7 +225,7 @@ public void serializeDeserializeEmptyBatch() throws SerializationException { } @Test - public void checkMessageBatchLength() throws SerializationException { + public void checkMessageBatchLength() { MessageSerializer serializer = new MessageSerializer(); ByteBuffer buffer = ByteBuffer.allocate(10_000); diff --git a/gradle.properties b/gradle.properties index 82bc61445..fd2017a93 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=5.7.0 +release_version=5.8.0 description='Cradle API' vcs_url=https://github.com/th2-net/cradleapi \ No newline at end of file From e0c9e0ac5dd1a969cc369b8ed39d0e73a87dc460 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Thu, 13 Nov 2025 18:00:59 +0400 Subject: [PATCH 2/3] Used Google iterator instead of stream adapter --- .../FilteredGroupedMessageBatchIterator.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/dao/messages/FilteredGroupedMessageBatchIterator.java b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/dao/messages/FilteredGroupedMessageBatchIterator.java index 73d5cbe84..e84bfd2d3 100644 --- a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/dao/messages/FilteredGroupedMessageBatchIterator.java +++ b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/dao/messages/FilteredGroupedMessageBatchIterator.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,14 @@ package com.exactpro.cradle.cassandra.dao.messages; -import com.datastax.oss.driver.shaded.guava.common.collect.Streams; -import com.exactpro.cradle.filters.FilterForGreater; -import com.exactpro.cradle.filters.FilterForLess; import com.exactpro.cradle.messages.GroupedMessageFilter; import com.exactpro.cradle.messages.StoredGroupedMessageBatch; -import java.time.Instant; import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; public class FilteredGroupedMessageBatchIterator extends MappedIterator { @@ -39,9 +37,7 @@ public FilteredGroupedMessageBatchIterator(Iterator s private static Iterator createTargetIterator(Iterator sourceIterator, GroupedMessageFilter filter) { Predicate filterPredicate = createFilterPredicate(filter); - return Streams.stream(sourceIterator) - .filter(filterPredicate) - .iterator(); + return Iterators.filter(sourceIterator, filterPredicate); } private static Predicate createFilterPredicate(GroupedMessageFilter filter) From 44c90e245fcd3eafa3443b784976b66d6e655c99 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Thu, 13 Nov 2025 18:09:50 +0400 Subject: [PATCH 3/3] Disabled exemplars for Prometheus counters --- .../cassandra/dao/testevents/TestEventEntityUtils.java | 4 ++++ .../exactpro/cradle/cassandra/workers/EventsWorker.java | 6 ++++++ .../exactpro/cradle/cassandra/workers/MessagesWorker.java | 8 +++++++- .../main/java/com/exactpro/cradle/BookInfoMetrics.java | 8 +++++++- 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/dao/testevents/TestEventEntityUtils.java b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/dao/testevents/TestEventEntityUtils.java index dfa8b148b..83d29d737 100644 --- a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/dao/testevents/TestEventEntityUtils.java +++ b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/dao/testevents/TestEventEntityUtils.java @@ -59,21 +59,25 @@ public class TestEventEntityUtils { .name("cradle_test_event_restore_duration_seconds") .help("Time spent restoring batch / batched-event / single-event data from optionally compressed content") .labelNames(LABEL_TYPE) + .withoutExemplars() .register(); private static final Counter DESERIALISATION_DURATION = Counter.build() .name("cradle_test_event_deserialisation_duration_seconds") .help("Time spent deserializing batch / batched-event / single-event") .labelNames(LABEL_TYPE) + .withoutExemplars() .register(); private static final Counter CONTENT_SIZE = Counter.build() .name("cradle_test_event_deserialisation_content_bytes_total") .help("Total size of content processed during event deserialization") .labelNames(LABEL_TYPE) + .withoutExemplars() .register(); private static final Counter ITEMS_TOTAL = Counter.build() .name("cradle_test_event_deserialized_total") .help("Number of deserialized batch / batched-event / single-event") .labelNames(LABEL_TYPE) + .withoutExemplars() .register(); private static final Metric batchMetric = new Metric( diff --git a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/workers/EventsWorker.java b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/workers/EventsWorker.java index 8476cc4d6..ba9470c71 100644 --- a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/workers/EventsWorker.java +++ b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/workers/EventsWorker.java @@ -68,6 +68,7 @@ public class EventsWorker extends Worker { .name("cradle_test_events_read_total") .help("Fetched test events") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder EVENT_BATCHES_READ_METRIC = new MetricHolder<>( @@ -75,6 +76,7 @@ public class EventsWorker extends Worker { .name("cradle_test_event_batches_read_total") .help("Fetched test event batches") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder EVENTS_STORE_METRIC = new MetricHolder<>( @@ -82,6 +84,7 @@ public class EventsWorker extends Worker { .name("cradle_test_events_stored_total") .help("Stored test events") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder EVENT_BATCHES_STORE_METRIC = new MetricHolder<>( @@ -89,6 +92,7 @@ public class EventsWorker extends Worker { .name("cradle_test_event_batches_stored_total") .help("Stored test event batches") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder EVENTS_STORE_UNCOMPRESSED_BYTES = new MetricHolder<>( @@ -96,6 +100,7 @@ public class EventsWorker extends Worker { .name("cradle_test_events_stored_uncompressed_bytes_total") .help("Stored uncompressed event bytes") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder EVENTS_STORE_COMPRESSED_BYTES = new MetricHolder<>( @@ -103,6 +108,7 @@ public class EventsWorker extends Worker { .name("cradle_test_events_stored_compressed_bytes_total") .help("Stored compressed event bytes") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); diff --git a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/workers/MessagesWorker.java b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/workers/MessagesWorker.java index bb67a39ff..604e0fc08 100644 --- a/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/workers/MessagesWorker.java +++ b/cradle-cassandra/src/main/java/com/exactpro/cradle/cassandra/workers/MessagesWorker.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2024 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -98,6 +98,7 @@ public class MessagesWorker extends Worker { .name("cradle_message_read_total") .help("Fetched messages") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder MESSAGE_BATCH_READ_METRIC = new MetricHolder<>( @@ -105,6 +106,7 @@ public class MessagesWorker extends Worker { .name("cradle_message_batch_read_total") .help("Fetched message batches") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder MESSAGE_STORE_METRIC = new MetricHolder<>( @@ -112,6 +114,7 @@ public class MessagesWorker extends Worker { .name("cradle_message_stored_total") .help("Stored messages") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder MESSAGE_BATCH_STORE_METRIC = new MetricHolder<>( @@ -119,6 +122,7 @@ public class MessagesWorker extends Worker { .name("cradle_message_batch_stored_total") .help("Stored message batches") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder MESSAGE_STORE_UNCOMPRESSED_BYTES = new MetricHolder<>( @@ -126,6 +130,7 @@ public class MessagesWorker extends Worker { .name("cradle_message_stored_uncompressed_bytes_total") .help("Stored uncompressed message bytes") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); private static final MetricHolder MESSAGE_STORE_COMPRESSED_BYTES = new MetricHolder<>( @@ -133,6 +138,7 @@ public class MessagesWorker extends Worker { .name("cradle_message_stored_compressed_bytes_total") .help("Stored compressed message bytes") .labelNames(StreamLabel.LABEL_NAMES) + .withoutExemplars() .register() ); diff --git a/cradle-core/src/main/java/com/exactpro/cradle/BookInfoMetrics.java b/cradle-core/src/main/java/com/exactpro/cradle/BookInfoMetrics.java index 8e7914fff..affe404d5 100644 --- a/cradle-core/src/main/java/com/exactpro/cradle/BookInfoMetrics.java +++ b/cradle-core/src/main/java/com/exactpro/cradle/BookInfoMetrics.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 Exactpro (Exactpro Systems Limited) + * Copyright 2024-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,12 +40,14 @@ public class BookInfoMetrics { .name("cradle_page_cache_page_request_total") .help("Page requests number from cache") .labelNames(BOOK_LABEL, CACHE_NAME_LABEL, REQUEST_METHOD_LABEL) + .withoutExemplars() .register(); private static final Map PAGE_REQUEST_MAP = new ConcurrentHashMap<>(); private static final Counter INVALIDATE_CACHE_COUNTER = Counter.build() .name("cradle_page_cache_invalidate_total") .help("Cache invalidates") .labelNames(BOOK_LABEL, CACHE_NAME_LABEL, INVALIDATE_CAUSE_LABEL) + .withoutExemplars() .register(); private static final Map INVALIDATE_CACHE_MAP = new ConcurrentHashMap<>(); @@ -59,6 +61,7 @@ public class BookInfoMetrics { private BookInfoMetrics() {} + @SuppressWarnings("unused") public static double getPageCacheSize(BookId bookIdId, CacheName cacheName) { Gauge.Child child = PAGE_CACHE_SIZE_MAP.get(new LoadsKey(bookIdId, cacheName)); return child == null ? 0.0 : child.get(); @@ -72,6 +75,7 @@ public static void setPageCacheSize(BookId bookIdId, CacheName cacheName, int va ).set(value); } + @SuppressWarnings("unused") public static double getRequest(BookId bookId, CacheName cacheName, RequestMethod method) { Counter.Child child = PAGE_REQUEST_MAP.get(new PageRequestKey(bookId, cacheName, method)); return child == null ? 0.0 : child.get(); @@ -86,6 +90,7 @@ public static void incRequest(BookId bookId, CacheName cacheName, RequestMethod ).inc(); } + @SuppressWarnings("unused") public static double getInvalidate(BookId bookId, CacheName cacheName, RemovalCause cause) { Counter.Child child = INVALIDATE_CACHE_MAP.get(new InvalidateKey(bookId, cacheName, cause)); return child == null ? 0.0 : child.get(); @@ -100,6 +105,7 @@ public static double getLoadCount(BookId bookIdId, CacheName cacheName) { Summary.Child child = PAGE_LOADS_MAP.get(new LoadsKey(bookIdId, cacheName)); return child == null ? 0.0 : child.get().count; } + @SuppressWarnings("unused") public static double getLoadSum(BookId bookIdId, CacheName cacheName) { Summary.Child child = PAGE_LOADS_MAP.get(new LoadsKey(bookIdId, cacheName)); return child == null ? 0.0 : child.get().sum;