Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,16 +16,14 @@

package com.exactpro.cradle.cassandra.dao.messages;

import com.datastax.oss.driver.shaded.guava.common.collect.Streams;
import com.exactpro.cradle.filters.FilterForGreater;
import com.exactpro.cradle.filters.FilterForLess;
import com.exactpro.cradle.messages.GroupedMessageFilter;
import com.exactpro.cradle.messages.StoredGroupedMessageBatch;

import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;

public class FilteredGroupedMessageBatchIterator extends MappedIterator<StoredGroupedMessageBatch, StoredGroupedMessageBatch>
{
Expand All @@ -39,9 +37,7 @@ public FilteredGroupedMessageBatchIterator(Iterator<StoredGroupedMessageBatch> s
private static Iterator<StoredGroupedMessageBatch> createTargetIterator(Iterator<StoredGroupedMessageBatch> sourceIterator, GroupedMessageFilter filter)
{
Predicate<StoredGroupedMessageBatch> filterPredicate = createFilterPredicate(filter);
return Streams.stream(sourceIterator)
.filter(filterPredicate)
.iterator();
return Iterators.filter(sourceIterator, filterPredicate);
}

private static Predicate<StoredGroupedMessageBatch> createFilterPredicate(GroupedMessageFilter filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,25 @@ public class TestEventEntityUtils {
.name("cradle_test_event_restore_duration_seconds")
.help("Time spent restoring batch / batched-event / single-event data from optionally compressed content")
.labelNames(LABEL_TYPE)
.withoutExemplars()
.register();
private static final Counter DESERIALISATION_DURATION = Counter.build()
.name("cradle_test_event_deserialisation_duration_seconds")
.help("Time spent deserializing batch / batched-event / single-event")
.labelNames(LABEL_TYPE)
.withoutExemplars()
.register();
private static final Counter CONTENT_SIZE = Counter.build()
.name("cradle_test_event_deserialisation_content_bytes_total")
.help("Total size of content processed during event deserialization")
.labelNames(LABEL_TYPE)
.withoutExemplars()
.register();
private static final Counter ITEMS_TOTAL = Counter.build()
.name("cradle_test_event_deserialized_total")
.help("Number of deserialized batch / batched-event / single-event")
.labelNames(LABEL_TYPE)
.withoutExemplars()
.register();

private static final Metric batchMetric = new Metric(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -89,21 +89,21 @@ public static SerializedEntity<SerializedMessageMetadata, GroupedMessageBatchEnt
public static StoredGroupedMessageBatch toStoredGroupedMessageBatch(GroupedMessageBatchEntity entity, PageId pageId) throws DataFormatException, IOException, CompressException {
logger.debug("Creating grouped message batch from entity");

byte[] content = restoreContent(entity, entity.getGroup());
ByteBuffer content = restoreContent(entity, entity.getGroup());
List<StoredMessage> storedMessages = MessageUtils.deserializeMessages(content, pageId.getBookId());
return new StoredGroupedMessageBatch(entity.getGroup(), storedMessages, pageId, entity.getRecDate());
}

private static byte[] restoreContent(GroupedMessageBatchEntity entity, String group) throws CompressException {
private static ByteBuffer restoreContent(GroupedMessageBatchEntity entity, String group) throws CompressException {
ByteBuffer content = entity.getContent();
if (content == null)
return null;

byte[] result = content.array();
if (entity.isCompressed()) {
logger.trace("Decompressing content of grouped message batch '{}'", group);
return CompressionType.decompressData(result);
ByteBuffer result = ByteBuffer.allocate(entity.getUncompressedContentSize());
return CompressionType.decompressData(content, result);
}
return result;
return content;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,7 +91,7 @@ public static StoredMessageBatch toStoredMessageBatch(MessageBatchEntity entity,
StoredMessageId batchId = createId(entity, pageId.getBookId());
logger.debug("Creating message batch '{}' from entity", batchId);

byte[] content = restoreContent(entity, batchId);
ByteBuffer content = restoreContent(entity, batchId);
List<StoredMessage> storedMessages = MessageUtils.deserializeMessages(content, batchId);
return new StoredMessageBatch(storedMessages, pageId, entity.getRecDate());
}
Expand All @@ -106,16 +106,16 @@ private static StoredMessageId createId(MessageBatchEntity entity, BookId bookId
}


private static byte[] restoreContent(MessageBatchEntity entity, StoredMessageId messageBatchId) throws CompressException {
private static ByteBuffer restoreContent(MessageBatchEntity entity, StoredMessageId messageBatchId) throws CompressException {
ByteBuffer content = entity.getContent();
if (content == null)
return null;

byte[] result = content.array();
if (entity.isCompressed()) {
logger.trace("Decompressing content of message batch '{}'", messageBatchId);
return CompressionType.decompressData(result);
ByteBuffer result = ByteBuffer.allocate(entity.getUncompressedContentSize());
return CompressionType.decompressData(content, result);
}
return result;
return content;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,41 +68,47 @@ public class EventsWorker extends Worker {
.name("cradle_test_events_read_total")
.help("Fetched test events")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> EVENT_BATCHES_READ_METRIC = new MetricHolder<>(
Counter.build()
.name("cradle_test_event_batches_read_total")
.help("Fetched test event batches")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> EVENTS_STORE_METRIC = new MetricHolder<>(
Counter.build()
.name("cradle_test_events_stored_total")
.help("Stored test events")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> EVENT_BATCHES_STORE_METRIC = new MetricHolder<>(
Counter.build()
.name("cradle_test_event_batches_stored_total")
.help("Stored test event batches")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> EVENTS_STORE_UNCOMPRESSED_BYTES = new MetricHolder<>(
Counter.build()
.name("cradle_test_events_stored_uncompressed_bytes_total")
.help("Stored uncompressed event bytes")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> EVENTS_STORE_COMPRESSED_BYTES = new MetricHolder<>(
Counter.build()
.name("cradle_test_events_stored_compressed_bytes_total")
.help("Stored compressed event bytes")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,41 +98,47 @@ public class MessagesWorker extends Worker {
.name("cradle_message_read_total")
.help("Fetched messages")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> MESSAGE_BATCH_READ_METRIC = new MetricHolder<>(
Counter.build()
.name("cradle_message_batch_read_total")
.help("Fetched message batches")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> MESSAGE_STORE_METRIC = new MetricHolder<>(
Counter.build()
.name("cradle_message_stored_total")
.help("Stored messages")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> MESSAGE_BATCH_STORE_METRIC = new MetricHolder<>(
Counter.build()
.name("cradle_message_batch_stored_total")
.help("Stored message batches")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> MESSAGE_STORE_UNCOMPRESSED_BYTES = new MetricHolder<>(
Counter.build()
.name("cradle_message_stored_uncompressed_bytes_total")
.help("Stored uncompressed message bytes")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);
private static final MetricHolder<StreamLabel> MESSAGE_STORE_COMPRESSED_BYTES = new MetricHolder<>(
Counter.build()
.name("cradle_message_stored_compressed_bytes_total")
.help("Stored compressed message bytes")
.labelNames(StreamLabel.LABEL_NAMES)
.withoutExemplars()
.register()
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@ public static StoredGroupedMessageBatch groupedMessageBatchToStored (PageId page
.setDirection(el.getDirection())
.setTimestamp(el.getTimestamp())
.setIndex(el.getSequence())
.setContent(el.getContent())
.setContent(el.getContentBuffer())
.setProtocol(el.getProtocol())
.build()).collect(Collectors.toList()),
pageId,
Expand All @@ -49,7 +49,7 @@ public static StoredMessageBatch messageBatchToStored (PageId pageId, Instant re
.setDirection(el.getDirection())
.setTimestamp(el.getTimestamp())
.setIndex(el.getSequence())
.setContent(el.getContent())
.setContent(el.getContentBuffer())
.setProtocol(el.getProtocol())
.build()).collect(Collectors.toList()),
pageId,
Expand All @@ -63,7 +63,7 @@ public static StoredMessage messageToStored(StoredMessage storedMessage, PageId
.setDirection(storedMessage.getDirection())
.setTimestamp(storedMessage.getTimestamp())
.setIndex(storedMessage.getSequence())
.setContent(storedMessage.getContent())
.setContent(storedMessage.getContentBuffer())
.setProtocol(storedMessage.getProtocol())
.setPageId(pageId)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
* Copyright 2024-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,12 +40,14 @@ public class BookInfoMetrics {
.name("cradle_page_cache_page_request_total")
.help("Page requests number from cache")
.labelNames(BOOK_LABEL, CACHE_NAME_LABEL, REQUEST_METHOD_LABEL)
.withoutExemplars()
.register();
private static final Map<PageRequestKey, Counter.Child> PAGE_REQUEST_MAP = new ConcurrentHashMap<>();
private static final Counter INVALIDATE_CACHE_COUNTER = Counter.build()
.name("cradle_page_cache_invalidate_total")
.help("Cache invalidates")
.labelNames(BOOK_LABEL, CACHE_NAME_LABEL, INVALIDATE_CAUSE_LABEL)
.withoutExemplars()
.register();
private static final Map<InvalidateKey, Counter.Child> INVALIDATE_CACHE_MAP = new ConcurrentHashMap<>();

Expand All @@ -59,6 +61,7 @@ public class BookInfoMetrics {

private BookInfoMetrics() {}

@SuppressWarnings("unused")
public static double getPageCacheSize(BookId bookIdId, CacheName cacheName) {
Gauge.Child child = PAGE_CACHE_SIZE_MAP.get(new LoadsKey(bookIdId, cacheName));
return child == null ? 0.0 : child.get();
Expand All @@ -72,6 +75,7 @@ public static void setPageCacheSize(BookId bookIdId, CacheName cacheName, int va
).set(value);
}

@SuppressWarnings("unused")
public static double getRequest(BookId bookId, CacheName cacheName, RequestMethod method) {
Counter.Child child = PAGE_REQUEST_MAP.get(new PageRequestKey(bookId, cacheName, method));
return child == null ? 0.0 : child.get();
Expand All @@ -86,6 +90,7 @@ public static void incRequest(BookId bookId, CacheName cacheName, RequestMethod
).inc();
}

@SuppressWarnings("unused")
public static double getInvalidate(BookId bookId, CacheName cacheName, RemovalCause cause) {
Counter.Child child = INVALIDATE_CACHE_MAP.get(new InvalidateKey(bookId, cacheName, cause));
return child == null ? 0.0 : child.get();
Expand All @@ -100,6 +105,7 @@ public static double getLoadCount(BookId bookIdId, CacheName cacheName) {
Summary.Child child = PAGE_LOADS_MAP.get(new LoadsKey(bookIdId, cacheName));
return child == null ? 0.0 : child.get().count;
}
@SuppressWarnings("unused")
public static double getLoadSum(BookId bookIdId, CacheName cacheName) {
Summary.Child child = PAGE_LOADS_MAP.get(new LoadsKey(bookIdId, cacheName));
return child == null ? 0.0 : child.get().sum;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import com.exactpro.cradle.Direction;
import com.exactpro.cradle.serialization.MessagesSizeCalculator;

import java.nio.ByteBuffer;
import java.time.Instant;

public interface CradleMessage {
Expand All @@ -31,8 +32,10 @@ public interface CradleMessage {
/**
* @return message content
*/
@Deprecated(since = "5.8.0")
byte[] getContent();

ByteBuffer getContentBuffer();
/**
* @return grpc protocol
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2025 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import com.exactpro.cradle.utils.CradleStorageException;
import com.exactpro.cradle.utils.MessageUtils;

import java.nio.ByteBuffer;
import java.util.Arrays;

/**
Expand Down Expand Up @@ -74,6 +75,12 @@ public byte[] getContent() {
return content;
}

@Override
public ByteBuffer getContentBuffer() {
if (content == null) { return null; }
return ByteBuffer.wrap(content);
}

@Override
public String getProtocol() {
return protocol;
Expand Down
Loading
Loading