From 8c882f64ea20da3d9b5292ab17fa765ac40281ee Mon Sep 17 00:00:00 2001 From: Miguel Fonseca Date: Thu, 9 Mar 2017 20:51:40 +0000 Subject: [PATCH] feat(NA) - Discard older metrics on buffer overflow. This tries to remove the oldest metric of the buffer once we reach max capacity and replace it with the new metrics instead of discarding those. --- .../client/core/buffer/AggregatedBuffer.java | 16 +++++++++++++++- .../client/core/buffer/StandardBuffer.java | 17 ++++++++++++++++- .../core/sender/BufferedMetricsSender.java | 6 ++---- .../sender/BufferedMetricsSenderAPITest.java | 4 +++- 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/statful/client/core/buffer/AggregatedBuffer.java b/core/src/main/java/com/statful/client/core/buffer/AggregatedBuffer.java index fadfecd..9333e93 100644 --- a/core/src/main/java/com/statful/client/core/buffer/AggregatedBuffer.java +++ b/core/src/main/java/com/statful/client/core/buffer/AggregatedBuffer.java @@ -7,12 +7,14 @@ import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; /** * Buffer to store aggregated metrics. */ public class AggregatedBuffer implements MetricsBuffer { + private static final Logger LOGGER = Logger.getLogger(AggregatedBuffer.class.getName()); private Map>> buffer; private int maxBufferSize; private int flushSize; @@ -63,7 +65,19 @@ public final boolean addToBuffer(final String metric, final Aggregation aggregat aggregatedBuffer.put(aggregationFrequency.toString(), aggregatedFreqBuffer); buffer.put(aggregation.toString(), aggregatedBuffer); - return aggregatedFreqBuffer.offer(metric); + boolean inserted = aggregatedFreqBuffer.offer(metric); + if (!inserted) { + try { + LOGGER.fine("Buffer is full, trying to discard oldest metric."); + aggregatedFreqBuffer.remove(); + } catch (NoSuchElementException e) { + LOGGER.warning("Buffer has been emptied before trying to discard oldest entry."); + } finally { + inserted = aggregatedFreqBuffer.offer(metric); + } + } + + return inserted; } /** diff --git a/core/src/main/java/com/statful/client/core/buffer/StandardBuffer.java b/core/src/main/java/com/statful/client/core/buffer/StandardBuffer.java index 8a18e3c..573d5e0 100644 --- a/core/src/main/java/com/statful/client/core/buffer/StandardBuffer.java +++ b/core/src/main/java/com/statful/client/core/buffer/StandardBuffer.java @@ -4,13 +4,16 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; +import java.util.logging.Logger; /** * Buffer to store metrics. */ public class StandardBuffer implements MetricsBuffer { + private static final Logger LOGGER = Logger.getLogger(StandardBuffer.class.getName()); private ArrayBlockingQueue buffer; private int maxBufferSize; private int flushSize; @@ -40,7 +43,19 @@ public final ArrayBlockingQueue getBuffer() { * @return A {@link Boolean} with the success of the operation */ public final boolean addToBuffer(final String metric) { - return buffer.offer(metric); + boolean inserted = buffer.offer(metric); + if (!inserted) { + try { + LOGGER.fine("Buffer is full, trying to discard oldest metric."); + buffer.remove(); + } catch (NoSuchElementException e) { + LOGGER.warning("Buffer has been emptied before trying to discard oldest entry."); + } finally { + inserted = buffer.offer(metric); + } + } + + return inserted; } /** diff --git a/core/src/main/java/com/statful/client/core/sender/BufferedMetricsSender.java b/core/src/main/java/com/statful/client/core/sender/BufferedMetricsSender.java index a578eff..61d7d17 100644 --- a/core/src/main/java/com/statful/client/core/sender/BufferedMetricsSender.java +++ b/core/src/main/java/com/statful/client/core/sender/BufferedMetricsSender.java @@ -192,8 +192,7 @@ private int sanitizeSampleRate(final int sampleRate) { private void putRaw(final String metric) { boolean inserted = standardBuffer.addToBuffer(metric); if (!inserted) { - // We should discard older metrics instead - LOGGER.warning("The buffer is full, metric ignored!."); + LOGGER.warning("Failed to add metric to buffer."); } if (standardBuffer.isTimeToFlush()) { @@ -204,8 +203,7 @@ private void putRaw(final String metric) { private void putAggregatedRaw(final String metric, final Aggregation aggregation, final AggregationFrequency aggregationFrequency) { boolean inserted = aggregatedBuffer.addToBuffer(metric, aggregation, aggregationFrequency); if (!inserted) { - // We should discard older metrics instead - LOGGER.warning("The buffer is full, metric ignored!."); + LOGGER.warning("Failed to add metric to buffer."); } if (aggregatedBuffer.isTimeToFlush()) { diff --git a/core/src/test/java/com/statful/client/core/sender/BufferedMetricsSenderAPITest.java b/core/src/test/java/com/statful/client/core/sender/BufferedMetricsSenderAPITest.java index 97182e7..6a4a295 100644 --- a/core/src/test/java/com/statful/client/core/sender/BufferedMetricsSenderAPITest.java +++ b/core/src/test/java/com/statful/client/core/sender/BufferedMetricsSenderAPITest.java @@ -60,7 +60,7 @@ public void tearDown() { } @Test - public void shouldDiscardIfStandardBufferIsFull() { + public void shouldDiscardOlderMetricsIfStandardBufferIsFull() { // Given when(configuration.getFlushSize()).thenReturn(10000); @@ -76,6 +76,7 @@ public void shouldDiscardIfStandardBufferIsFull() { // Then List buffer = subject.getStandardBuffer(); assertEquals("MetricsBuffer should have 5000 metrics", 5000, buffer.size()); + assertTrue(buffer.contains("application.test_metric_overflow 500 123456789 100")); } @Test @@ -95,6 +96,7 @@ public void shouldDiscardIfAggregatedBufferIsFull() { // Then Map>> buffer = subject.getAggregatedBuffer(); assertEquals("MetricsBuffer should have 5000 metrics", 5000, buffer.get(Aggregation.AVG).get(AggregationFrequency.FREQ_10).size()); + assertTrue(buffer.get(Aggregation.AVG).get(AggregationFrequency.FREQ_10).contains("application.test_metric_overflow 500 123456789 100")); } @Test