From 0898a0d183635e97268d85f930d196ee2e121c98 Mon Sep 17 00:00:00 2001 From: Yusuke Yasuda Date: Mon, 27 Nov 2017 19:40:43 +0900 Subject: [PATCH 01/13] add server-side scan metrics --- Makefile | 1 + src/Scanner.java | 79 +++++++++++++++++++++- src/ServerSideScanMetrics.java | 119 +++++++++++++++++++++++++++++++++ 3 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 src/ServerSideScanMetrics.java diff --git a/Makefile b/Makefile index 55af1ae4..3e49ca6b 100644 --- a/Makefile +++ b/Makefile @@ -131,6 +131,7 @@ asynchbase_SOURCES := \ src/SecureRpcHelper94.java \ src/SecureRpcHelper96.java \ src/ServerNotRunningYetException.java \ + src/ServerSideScanMetrics.java \ src/SingletonList.java \ src/SubstringComparator.java \ src/TableNotFoundException.java \ diff --git a/src/Scanner.java b/src/Scanner.java index 9d8dd983..1f5bd539 100644 --- a/src/Scanner.java +++ b/src/Scanner.java @@ -29,7 +29,11 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.hbase.async.generated.HBasePB; +import org.hbase.async.generated.MapReducePB; import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; @@ -198,6 +202,10 @@ public final class Scanner { private boolean moreRows; private boolean scannerClosedOnServer; + private boolean scan_metrics_enabled = false; + + private ServerSideScanMetrics scanMetrics; + /** * Constructor. * This byte array will NOT be copied. @@ -701,6 +709,18 @@ public void setTimeRange(final long min_timestamp, final long max_timestamp) { this.max_timestamp = max_timestamp; } + public void setScanMetricsEnabled(final boolean enabled) { + scan_metrics_enabled = enabled; + } + + public boolean isScanMetricsEnabled() { + return scan_metrics_enabled; + } + + public ServerSideScanMetrics getScanMetrics() { + return scanMetrics; + } + /** * Scans a number of rows. Calling this method is equivalent to: *
@@ -788,6 +808,7 @@ public Deferred>> call(final Object arg) {
               LOG.debug("Scanner " + Bytes.hex(scanner_id) + " opened on " + region);
             }
             if (resp != null) {
+              updateServerSideMetrics(resp.metrics);
               if (resp.rows == null) {
                 return scanFinished(!resp.more);
               }
@@ -1127,7 +1148,7 @@ RegionInfo currentRegion() {
    */
   HBaseRpc getNextRowsRequest() {
     if (get_next_rows_request == null) {
-      get_next_rows_request = new GetNextRowsRequest();
+      get_next_rows_request = new GetNextRowsRequest().withMetricsEnabled(this.isScanMetricsEnabled());
     }
     return get_next_rows_request;
   }
@@ -1187,6 +1208,8 @@ final static class Response {
 
     private final boolean scannerClosedOnServer;
 
+    private final Map metrics;
+
     Response(final long scanner_id,
              final ArrayList> rows,
              final boolean more, final boolean scannerClosedOnServer) {
@@ -1194,6 +1217,18 @@ final static class Response {
       this.rows = rows;
       this.more = more;
       this.scannerClosedOnServer = scannerClosedOnServer;
+      this.metrics = new HashMap();
+    }
+
+    Response(final long scanner_id,
+             final ArrayList> rows,
+             final boolean more, final boolean scannerClosedOnServer,
+             final Map metrics) {
+      this.scanner_id = scanner_id;
+      this.rows = rows;
+      this.more = more;
+      this.scannerClosedOnServer = scannerClosedOnServer;
+      this.metrics = metrics;
     }
 
     public String toString() {
@@ -1482,6 +1517,8 @@ public String toString() {
    */
   final class GetNextRowsRequest extends HBaseRpc {
 
+    boolean metrics_enabled = false;
+
     @Override
     byte[] method(final byte server_version) {
       return (server_version >= RegionClient.SERVER_VERSION_095_OR_ABOVE
@@ -1503,6 +1540,7 @@ ChannelBuffer serialize(final byte server_version) {
       final ScanRequest req = ScanRequest.newBuilder()
         .setScannerId(scanner_id)
         .setNumberOfRows(max_num_rows)
+        .setTrackScanMetrics(metrics_enabled)
         // Hardcoded these parameters to false since AsyncHBase cannot support them
         .setClientHandlesHeartbeats(false)
         .setClientHandlesPartials(false)
@@ -1523,8 +1561,36 @@ Response deserialize(final ChannelBuffer buf, final int cell_size) {
       if (rows == null) {
         return null;
       }
+      Map metrics = getScanMetrics(resp);
       final boolean scannerClosedOnServer = resp.hasMoreResultsInRegion() && !resp.getMoreResultsInRegion();
-      return new Response(resp.getScannerId(), rows, resp.getMoreResults(), scannerClosedOnServer);
+      return new Response(resp.getScannerId(), rows, resp.getMoreResults(), scannerClosedOnServer, metrics);
+    }
+
+    public GetNextRowsRequest withMetricsEnabled(boolean enabled) {
+      this.metrics_enabled = enabled;
+      return this;
+    }
+
+    private Map getScanMetrics(ScanResponse response) {
+      Map metricMap = new HashMap();
+      if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
+        return metricMap;
+      }
+
+      MapReducePB.ScanMetrics metrics = response.getScanMetrics();
+      int numberOfMetrics = metrics.getMetricsCount();
+      for (int i = 0; i < numberOfMetrics; i++) {
+        HBasePB.NameInt64Pair metricPair = metrics.getMetrics(i);
+        if (metricPair != null) {
+          String name = metricPair.getName();
+          Long value = metricPair.getValue();
+          if (name != null && value != null) {
+            metricMap.put(name, value);
+          }
+        }
+      }
+
+      return metricMap;
     }
 
     public String toString() {
@@ -1597,4 +1663,13 @@ public String toString() {
 
   }
 
+  private void updateServerSideMetrics(Map metrics) {
+    if (!metrics.isEmpty()) {
+      for (Map.Entry e : metrics.entrySet()) {
+        metrics.put(e.getKey(), e.getValue());
+        scanMetrics.addToCounter(e.getKey(), e.getValue());
+      }
+    }
+  }
+
 }
diff --git a/src/ServerSideScanMetrics.java b/src/ServerSideScanMetrics.java
new file mode 100644
index 00000000..efefea1c
--- /dev/null
+++ b/src/ServerSideScanMetrics.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
+ * This file is part of Async HBase.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *   - Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   - Redistributions in binary form must reproduce the above copyright notice,
+ *     this list of conditions and the following disclaimer in the documentation
+ *     and/or other materials provided with the distribution.
+ *   - Neither the name of the StumbleUpon nor the names of its contributors
+ *     may be used to endorse or promote products derived from this software
+ *     without specific prior written permission.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.hbase.async;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ServerSideScanMetrics {
+
+    /**
+     * Hash to hold the String -> Atomic Long mappings for each metric
+     */
+    private final Map counters = new HashMap();
+
+    /**
+     * Create a new counter with the specified name
+     * @param counterName
+     * @return {@link AtomicLong} instance for the counter with counterName
+     */
+    protected AtomicLong createCounter(String counterName) {
+        AtomicLong c = new AtomicLong(0);
+        counters.put(counterName, c);
+        return c;
+    }
+
+    public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
+    public static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
+
+    /**
+     * number of rows filtered during scan RPC
+     */
+    public final AtomicLong countOfRowsFiltered = createCounter(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME);
+
+    /**
+     * number of rows scanned during scan RPC. Not every row scanned will be returned to the client
+     * since rows may be filtered.
+     */
+    public final AtomicLong countOfRowsScanned = createCounter(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+
+    /**
+     * @param counterName
+     * @param value
+     */
+    public void setCounter(String counterName, long value) {
+        AtomicLong c = this.counters.get(counterName);
+        if (c != null) {
+            c.set(value);
+        }
+    }
+
+    /**
+     * @param counterName
+     * @return true if a counter exists with the counterName
+     */
+    public boolean hasCounter(String counterName) {
+        return this.counters.containsKey(counterName);
+    }
+
+    /**
+     * @param counterName
+     * @return {@link AtomicLong} instance for this counter name, null if counter does not exist.
+     */
+    public AtomicLong getCounter(String counterName) {
+        return this.counters.get(counterName);
+    }
+
+    /**
+     * @param counterName
+     * @param delta
+     */
+    public void addToCounter(String counterName, long delta) {
+        AtomicLong c = this.counters.get(counterName);
+        if (c != null) {
+            c.addAndGet(delta);
+        }
+    }
+
+    /**
+     * Get all of the values since the last time this function was called. Calling this function will
+     * reset all AtomicLongs in the instance back to 0.
+     * @return A Map of String -> Long for metrics
+     */
+    public Map getMetricsMap() {
+        // Create a builder
+        ImmutableMap.Builder builder = ImmutableMap.builder();
+        // For every entry add the value and reset the AtomicLong back to zero
+        for (Map.Entry e : this.counters.entrySet()) {
+            builder.put(e.getKey(), e.getValue().getAndSet(0));
+        }
+        // Build the immutable map so that people can't mess around with it.
+        return builder.build();
+    }
+}

From 0570862058871ef533952a1ee9a4293d0a75377b Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Tue, 28 Nov 2017 15:58:07 +0900
Subject: [PATCH 02/13] add client-side scan metrics still missing
 MILLIS_BETWEEN_NEXTS and RPC_RETRIES

---
 Makefile                       |  1 +
 src/ScanMetrics.java           | 77 ++++++++++++++++++++++++++++++++++
 src/Scanner.java               | 42 ++++++++++++++++++-
 src/ServerSideScanMetrics.java | 12 ++++++
 4 files changed, 130 insertions(+), 2 deletions(-)
 create mode 100644 src/ScanMetrics.java

diff --git a/Makefile b/Makefile
index 3e49ca6b..e2db48c1 100644
--- a/Makefile
+++ b/Makefile
@@ -125,6 +125,7 @@ asynchbase_SOURCES := \
 	src/RowFilter.java	\
 	src/RowLock.java	\
 	src/RowLockRequest.java	\
+	src/ScanMetrics.java	\
 	src/ScanFilter.java	\
 	src/Scanner.java	\
 	src/SecureRpcHelper.java	\
diff --git a/src/ScanMetrics.java b/src/ScanMetrics.java
new file mode 100644
index 00000000..73f70a10
--- /dev/null
+++ b/src/ScanMetrics.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
+ * This file is part of Async HBase.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *   - Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   - Redistributions in binary form must reproduce the above copyright notice,
+ *     this list of conditions and the following disclaimer in the documentation
+ *     and/or other materials provided with the distribution.
+ *   - Neither the name of the StumbleUpon nor the names of its contributors
+ *     may be used to endorse or promote products derived from this software
+ *     without specific prior written permission.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.hbase.async;
+
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ScanMetrics extends ServerSideScanMetrics {
+
+  public static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS";
+  public static final String MILLIS_BETWEEN_NEXTS_METRIC_NAME = "MILLIS_BETWEEN_NEXTS";
+  public static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION";
+  public static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS";
+  public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED";
+  public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES";
+
+  /**
+   * number of RPC calls
+   */
+  public final AtomicLong countOfRPCcalls = createCounter(RPC_CALLS_METRIC_NAME);
+
+  /**
+   * sum of milliseconds between sequential next calls
+   */
+  public final AtomicLong sumOfMillisSecBetweenNexts = createCounter(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+
+  /**
+   * number of NotServingRegionException caught
+   */
+  public final AtomicLong countOfNSRE = createCounter(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME);
+
+  /**
+   * number of bytes in Result objects from region servers
+   */
+  public final AtomicLong countOfBytesInResults = createCounter(BYTES_IN_RESULTS_METRIC_NAME);
+
+  /**
+   * number of regions
+   * Starts with 1 because it is incremented when a scanner switches to a next region.
+   */
+  public final AtomicLong countOfRegions = createCounter(REGIONS_SCANNED_METRIC_NAME, 1);
+
+  /**
+   * number of RPC retries
+   */
+  public final AtomicLong countOfRPCRetries = createCounter(RPC_RETRIES_METRIC_NAME);
+
+  /**
+   * constructor
+   */
+  public ScanMetrics() {
+  }
+}
diff --git a/src/Scanner.java b/src/Scanner.java
index 1f5bd539..b1df3264 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -204,7 +204,7 @@ public final class Scanner {
 
   private boolean scan_metrics_enabled = false;
 
-  private ServerSideScanMetrics scanMetrics;
+  private ScanMetrics scanMetrics = new ScanMetrics();
 
   /**
    * Constructor.
@@ -717,7 +717,7 @@ public boolean isScanMetricsEnabled() {
     return scan_metrics_enabled;
   }
 
-  public ServerSideScanMetrics getScanMetrics() {
+  public ScanMetrics getScanMetrics() {
     return scanMetrics;
   }
 
@@ -760,6 +760,7 @@ public Deferred>> nextRows() {
     if (region == DONE) {  // We're already done scanning.
       return Deferred.fromResult(null);
     } else if (region == null) {  // We need to open the scanner first.
+      incRPCcallsMetrics();
       if (this.isReversed() && !this.isFirstReverseRegion()){
         return client.openReverseScanner(this)
                 .addCallbackDeferring(opened_scanner);
@@ -774,6 +775,7 @@ public Deferred>> nextRows() {
     if(scannerClosedOnServer) {
       return scanFinished(moreRows);
     }
+    incRPCcallsMetrics();
     // Need to silence this warning because the callback `got_next_row'
     // declares its return type to be Object, because its return value
     // may or may not be deferred.
@@ -850,6 +852,8 @@ public Object call(final Object response) {
           return scanFinished(resp != null && !resp.more);
         }
 
+        updateResultsMetrics(rows);
+
         final ArrayList lastrow = rows.get(rows.size() - 1);
         start_key = lastrow.get(0).key();
         return rows;
@@ -868,6 +872,7 @@ public Object call(final Object error) {
         final RegionInfo old_region = region;  // Save before invalidate().
         invalidate();  // If there was an error, don't assume we're still OK.
         if (error instanceof NotServingRegionException) {
+          incCountOfNSRE();
           // We'll resume scanning on another region, and we want to pick up
           // right after the last key we successfully returned.  Padding the
           // last key with an extra 0 gives us the next possible key.
@@ -915,6 +920,7 @@ public Deferred close() {
     if (region == null || region == DONE) {
       return Deferred.fromResult(null);
     }
+    incRPCcallsMetrics();
     return client.closeScanner(this).addBoth(closedCallback());
   }
 
@@ -1013,6 +1019,7 @@ private Deferred>> continueScanOnNextRegion() {
       LOG.debug("Scanner " + Bytes.hex(old_scanner_id) + " done scanning "
               + old_region);
     }
+    incRPCcallsMetrics();
     client.closeScanner(this).addCallback(new Callback() {
       public Object call(final Object arg) {
         if(LOG.isDebugEnabled()) {
@@ -1036,6 +1043,7 @@ public String toString() {
 
     scanner_id = 0xDEAD000AA000DEADL;   // Make debugging easier.
     invalidate();
+    incCountOfRegions();
     return nextRows();
   }
 
@@ -1672,4 +1680,34 @@ private void updateServerSideMetrics(Map metrics) {
     }
   }
 
+  private void incRPCcallsMetrics() {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.countOfRPCcalls.incrementAndGet();
+    }
+  }
+
+  private void updateResultsMetrics(ArrayList> rows) {
+    if (isScanMetricsEnabled()) {
+      long resultSize = 0;
+      for (ArrayList row: rows) {
+        for (KeyValue cell: row) {
+          resultSize += cell.predictSerializedSize();
+        }
+      }
+      this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+    }
+  }
+
+  private void incCountOfNSRE() {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.countOfNSRE.incrementAndGet();
+    }
+  }
+
+  private void incCountOfRegions() {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.countOfRegions.incrementAndGet();
+    }
+  }
+
 }
diff --git a/src/ServerSideScanMetrics.java b/src/ServerSideScanMetrics.java
index efefea1c..dccff4c8 100644
--- a/src/ServerSideScanMetrics.java
+++ b/src/ServerSideScanMetrics.java
@@ -49,6 +49,18 @@ protected AtomicLong createCounter(String counterName) {
         return c;
     }
 
+  /**
+   * Create a new counter with the specified name
+   * @param counterName
+   * @param initialValue
+   * @return
+   */
+  protected AtomicLong createCounter(String counterName, long initialValue) {
+    AtomicLong c = new AtomicLong(initialValue);
+    counters.put(counterName, c);
+    return c;
+  }
+
     public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
     public static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
 

From 18f3bf4e0621ca3e02e2992dbb22c8679a869126 Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Tue, 28 Nov 2017 17:02:18 +0900
Subject: [PATCH 03/13] fix server-side metrics collection place

---
 src/Scanner.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/Scanner.java b/src/Scanner.java
index b1df3264..2b553ca4 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -810,7 +810,6 @@ public Deferred>> call(final Object arg) {
               LOG.debug("Scanner " + Bytes.hex(scanner_id) + " opened on " + region);
             }
             if (resp != null) {
-              updateServerSideMetrics(resp.metrics);
               if (resp.rows == null) {
                 return scanFinished(!resp.more);
               }
@@ -839,6 +838,7 @@ public Object call(final Object response) {
           rows = resp.rows;
           scannerClosedOnServer = resp.scannerClosedOnServer;
           moreRows = resp.more;
+          updateServerSideMetrics(resp.metrics);
         } else if (response instanceof ArrayList) {  // HBase 0.94 and before.
           @SuppressWarnings("unchecked")  // I 3>> generics.
           final ArrayList> r =

From 6e9a946f9452bf6dd65c79285a18a93ce80f0d9f Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Tue, 28 Nov 2017 19:02:04 +0900
Subject: [PATCH 04/13] add MILLIS_BETWEEN_NEXTS metrics

---
 src/Scanner.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/src/Scanner.java b/src/Scanner.java
index 2b553ca4..0ec15655 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -204,6 +204,8 @@ public final class Scanner {
 
   private boolean scan_metrics_enabled = false;
 
+  private long last_next_timestamp = System.currentTimeMillis();
+
   private ScanMetrics scanMetrics = new ScanMetrics();
 
   /**
@@ -792,6 +794,8 @@ public Deferred>> nextRows() {
     opened_scanner =
       new Callback>>, Object>() {
           public Deferred>> call(final Object arg) {
+            long currentTime = System.currentTimeMillis();
+            updateSumOfMillisSecBetweenNexts(currentTime);
             final Response resp;
             if (arg instanceof Long) {
               scanner_id = (Long) arg;
@@ -831,6 +835,8 @@ public String toString() {
   private final Callback got_next_row =
     new Callback() {
       public Object call(final Object response) {
+        long currentTime = System.currentTimeMillis();
+        updateSumOfMillisSecBetweenNexts(currentTime);
         ArrayList> rows = null;
         Response resp = null;
         if (response instanceof Response) {  // HBase 0.95 and up
@@ -869,6 +875,8 @@ public String toString() {
   private final Callback nextRowErrback() {
     return new Callback() {
       public Object call(final Object error) {
+        long currentTime = System.currentTimeMillis();
+        updateSumOfMillisSecBetweenNexts(currentTime);
         final RegionInfo old_region = region;  // Save before invalidate().
         invalidate();  // If there was an error, don't assume we're still OK.
         if (error instanceof NotServingRegionException) {
@@ -928,6 +936,8 @@ public Deferred close() {
   private Callback closedCallback() {
     return new Callback() {
       public Object call(Object arg) {
+        long currentTime = System.currentTimeMillis();
+        updateSumOfMillisSecBetweenNexts(currentTime);
         if (arg instanceof Exception) {
           final Exception error = (Exception) arg;
           // NotServingRegionException:
@@ -1710,4 +1720,11 @@ private void incCountOfRegions() {
     }
   }
 
+  private void updateSumOfMillisSecBetweenNexts(long currentTime) {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - last_next_timestamp);
+      last_next_timestamp = currentTime;
+    }
+  }
+
 }

From b09895d0c55b369fd990d2fc85b0993b3a736422 Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Wed, 29 Nov 2017 16:36:11 +0900
Subject: [PATCH 05/13] use internal class for metrics classes

---
 Makefile                       |   2 -
 src/ScanMetrics.java           |  77 -------------------
 src/Scanner.java               | 115 +++++++++++++++++++++++++++--
 src/ServerSideScanMetrics.java | 131 ---------------------------------
 4 files changed, 108 insertions(+), 217 deletions(-)
 delete mode 100644 src/ScanMetrics.java
 delete mode 100644 src/ServerSideScanMetrics.java

diff --git a/Makefile b/Makefile
index e2db48c1..55af1ae4 100644
--- a/Makefile
+++ b/Makefile
@@ -125,14 +125,12 @@ asynchbase_SOURCES := \
 	src/RowFilter.java	\
 	src/RowLock.java	\
 	src/RowLockRequest.java	\
-	src/ScanMetrics.java	\
 	src/ScanFilter.java	\
 	src/Scanner.java	\
 	src/SecureRpcHelper.java	\
 	src/SecureRpcHelper94.java	\
 	src/SecureRpcHelper96.java	\
 	src/ServerNotRunningYetException.java	\
-	src/ServerSideScanMetrics.java  \
 	src/SingletonList.java	\
 	src/SubstringComparator.java	\
 	src/TableNotFoundException.java	\
diff --git a/src/ScanMetrics.java b/src/ScanMetrics.java
deleted file mode 100644
index 73f70a10..00000000
--- a/src/ScanMetrics.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
- * This file is part of Async HBase.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *   - Redistributions of source code must retain the above copyright notice,
- *     this list of conditions and the following disclaimer.
- *   - Redistributions in binary form must reproduce the above copyright notice,
- *     this list of conditions and the following disclaimer in the documentation
- *     and/or other materials provided with the distribution.
- *   - Neither the name of the StumbleUpon nor the names of its contributors
- *     may be used to endorse or promote products derived from this software
- *     without specific prior written permission.
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-package org.hbase.async;
-
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ScanMetrics extends ServerSideScanMetrics {
-
-  public static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS";
-  public static final String MILLIS_BETWEEN_NEXTS_METRIC_NAME = "MILLIS_BETWEEN_NEXTS";
-  public static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION";
-  public static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS";
-  public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED";
-  public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES";
-
-  /**
-   * number of RPC calls
-   */
-  public final AtomicLong countOfRPCcalls = createCounter(RPC_CALLS_METRIC_NAME);
-
-  /**
-   * sum of milliseconds between sequential next calls
-   */
-  public final AtomicLong sumOfMillisSecBetweenNexts = createCounter(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
-
-  /**
-   * number of NotServingRegionException caught
-   */
-  public final AtomicLong countOfNSRE = createCounter(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME);
-
-  /**
-   * number of bytes in Result objects from region servers
-   */
-  public final AtomicLong countOfBytesInResults = createCounter(BYTES_IN_RESULTS_METRIC_NAME);
-
-  /**
-   * number of regions
-   * Starts with 1 because it is incremented when a scanner switches to a next region.
-   */
-  public final AtomicLong countOfRegions = createCounter(REGIONS_SCANNED_METRIC_NAME, 1);
-
-  /**
-   * number of RPC retries
-   */
-  public final AtomicLong countOfRPCRetries = createCounter(RPC_RETRIES_METRIC_NAME);
-
-  /**
-   * constructor
-   */
-  public ScanMetrics() {
-  }
-}
diff --git a/src/Scanner.java b/src/Scanner.java
index 0ec15655..3f038622 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -32,6 +32,7 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
 import org.hbase.async.generated.HBasePB;
 import org.hbase.async.generated.MapReducePB;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -1684,15 +1685,18 @@ public String toString() {
   private void updateServerSideMetrics(Map metrics) {
     if (!metrics.isEmpty()) {
       for (Map.Entry e : metrics.entrySet()) {
-        metrics.put(e.getKey(), e.getValue());
-        scanMetrics.addToCounter(e.getKey(), e.getValue());
+        if (e.getKey().equals(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)) {
+          scanMetrics.count_of_rows_scanned += e.getValue();
+        } else if (e.getKey().equals(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME)) {
+          scanMetrics.count_of_rows_filtered += e.getValue();
+        }
       }
     }
   }
 
   private void incRPCcallsMetrics() {
     if (isScanMetricsEnabled()) {
-      this.scanMetrics.countOfRPCcalls.incrementAndGet();
+      this.scanMetrics.count_of_rpc_calls += 1;
     }
   }
 
@@ -1704,27 +1708,124 @@ private void updateResultsMetrics(ArrayList> rows) {
           resultSize += cell.predictSerializedSize();
         }
       }
-      this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+      this.scanMetrics.count_of_bytes_in_results += resultSize;
     }
   }
 
   private void incCountOfNSRE() {
     if (isScanMetricsEnabled()) {
-      this.scanMetrics.countOfNSRE.incrementAndGet();
+      this.scanMetrics.count_of_nsre += 1;
     }
   }
 
   private void incCountOfRegions() {
     if (isScanMetricsEnabled()) {
-      this.scanMetrics.countOfRegions.incrementAndGet();
+      this.scanMetrics.count_of_regions += 1;
     }
   }
 
   private void updateSumOfMillisSecBetweenNexts(long currentTime) {
     if (isScanMetricsEnabled()) {
-      this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - last_next_timestamp);
+      this.scanMetrics.sum_of_millis_sec_between_nexts += (currentTime - last_next_timestamp);
       last_next_timestamp = currentTime;
     }
   }
 
+  private static class ServerSideScanMetrics {
+
+    public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
+    public static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
+
+    protected long count_of_rows_scanned = 0;
+    protected long count_of_rows_filtered = 0;
+
+    /**
+     * number of rows scanned during scan RPC. Not every row scanned will be returned to the client
+     * since rows may be filtered.
+     */
+    public long getCountOfRowsScanned() { return count_of_rows_scanned; };
+
+    /**
+     * number of rows filtered during scan RPC
+     */
+    public long getCountOfRowsFiltered() { return count_of_rows_filtered; }
+
+  }
+
+  public static class ScanMetrics extends ServerSideScanMetrics {
+
+    public static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS";
+    public static final String MILLIS_BETWEEN_NEXTS_METRIC_NAME = "MILLIS_BETWEEN_NEXTS";
+    public static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION";
+    public static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS";
+    public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED";
+    public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES";
+
+    private long count_of_rpc_calls = 0;
+
+    private long sum_of_millis_sec_between_nexts = 0;
+
+    private long count_of_nsre = 0;
+
+    private long  count_of_bytes_in_results = 0;
+
+    /**
+     * Starts with 1 because it is incremented when a scanner switches to a next region.
+     */
+    private long count_of_regions = 1;
+
+    private long count_of_rpc_retries = 0;
+
+    /**
+     * number of RPC calls
+     */
+    public long getCountOfRPCcalls() { return count_of_rpc_calls; }
+
+    /**
+     * sum of milliseconds between sequential next calls
+     */
+    public long getSumOfMillisSecBetweenNexts() { return sum_of_millis_sec_between_nexts; }
+
+    /**
+     * number of NotServingRegionException caught
+     */
+    public long getCountOfNSRE() { return count_of_nsre; }
+
+    /**
+     * number of bytes in Result objects from region servers
+     */
+    public long getCountOfBytesInResults() { return count_of_bytes_in_results; }
+
+    /**
+     * number of regions
+     * Starts with 1 because it is incremented when a scanner switches to a next region.
+     */
+    public long getCountOfRegions() { return count_of_regions; };
+
+    /**
+     * number of RPC retries
+     */
+    public long getCountOfRPCRetries() { return count_of_rpc_retries; }
+
+    /**
+     * constructor
+     */
+    public ScanMetrics() {
+    }
+
+    public Map getMetricsMap() {
+      ImmutableMap.Builder builder = ImmutableMap.builder();
+      builder.put(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, count_of_rows_scanned);
+      builder.put(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, count_of_rows_filtered);
+      builder.put(RPC_CALLS_METRIC_NAME, count_of_rpc_calls);
+      builder.put(MILLIS_BETWEEN_NEXTS_METRIC_NAME, sum_of_millis_sec_between_nexts);
+      builder.put(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME, count_of_nsre);
+      builder.put(BYTES_IN_RESULTS_METRIC_NAME, count_of_bytes_in_results);
+      builder.put(REGIONS_SCANNED_METRIC_NAME, count_of_regions);
+      builder.put(RPC_RETRIES_METRIC_NAME, count_of_rpc_retries);
+      return builder.build();
+    }
+  }
+
+
 }
diff --git a/src/ServerSideScanMetrics.java b/src/ServerSideScanMetrics.java
deleted file mode 100644
index dccff4c8..00000000
--- a/src/ServerSideScanMetrics.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
- * This file is part of Async HBase.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *   - Redistributions of source code must retain the above copyright notice,
- *     this list of conditions and the following disclaimer.
- *   - Redistributions in binary form must reproduce the above copyright notice,
- *     this list of conditions and the following disclaimer in the documentation
- *     and/or other materials provided with the distribution.
- *   - Neither the name of the StumbleUpon nor the names of its contributors
- *     may be used to endorse or promote products derived from this software
- *     without specific prior written permission.
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-package org.hbase.async;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ServerSideScanMetrics {
-
-    /**
-     * Hash to hold the String -> Atomic Long mappings for each metric
-     */
-    private final Map counters = new HashMap();
-
-    /**
-     * Create a new counter with the specified name
-     * @param counterName
-     * @return {@link AtomicLong} instance for the counter with counterName
-     */
-    protected AtomicLong createCounter(String counterName) {
-        AtomicLong c = new AtomicLong(0);
-        counters.put(counterName, c);
-        return c;
-    }
-
-  /**
-   * Create a new counter with the specified name
-   * @param counterName
-   * @param initialValue
-   * @return
-   */
-  protected AtomicLong createCounter(String counterName, long initialValue) {
-    AtomicLong c = new AtomicLong(initialValue);
-    counters.put(counterName, c);
-    return c;
-  }
-
-    public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
-    public static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
-
-    /**
-     * number of rows filtered during scan RPC
-     */
-    public final AtomicLong countOfRowsFiltered = createCounter(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME);
-
-    /**
-     * number of rows scanned during scan RPC. Not every row scanned will be returned to the client
-     * since rows may be filtered.
-     */
-    public final AtomicLong countOfRowsScanned = createCounter(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
-
-    /**
-     * @param counterName
-     * @param value
-     */
-    public void setCounter(String counterName, long value) {
-        AtomicLong c = this.counters.get(counterName);
-        if (c != null) {
-            c.set(value);
-        }
-    }
-
-    /**
-     * @param counterName
-     * @return true if a counter exists with the counterName
-     */
-    public boolean hasCounter(String counterName) {
-        return this.counters.containsKey(counterName);
-    }
-
-    /**
-     * @param counterName
-     * @return {@link AtomicLong} instance for this counter name, null if counter does not exist.
-     */
-    public AtomicLong getCounter(String counterName) {
-        return this.counters.get(counterName);
-    }
-
-    /**
-     * @param counterName
-     * @param delta
-     */
-    public void addToCounter(String counterName, long delta) {
-        AtomicLong c = this.counters.get(counterName);
-        if (c != null) {
-            c.addAndGet(delta);
-        }
-    }
-
-    /**
-     * Get all of the values since the last time this function was called. Calling this function will
-     * reset all AtomicLongs in the instance back to 0.
-     * @return A Map of String -> Long for metrics
-     */
-    public Map getMetricsMap() {
-        // Create a builder
-        ImmutableMap.Builder builder = ImmutableMap.builder();
-        // For every entry add the value and reset the AtomicLong back to zero
-        for (Map.Entry e : this.counters.entrySet()) {
-            builder.put(e.getKey(), e.getValue().getAndSet(0));
-        }
-        // Build the immutable map so that people can't mess around with it.
-        return builder.build();
-    }
-}

From ac32ea7a576da281125a59035f19436bf45a3bcf Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Thu, 30 Nov 2017 15:57:59 +0900
Subject: [PATCH 06/13] update count of bytes when scanner is opened

---
 src/Scanner.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/Scanner.java b/src/Scanner.java
index 3f038622..92a16534 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -818,6 +818,7 @@ public Deferred>> call(final Object arg) {
               if (resp.rows == null) {
                 return scanFinished(!resp.more);
               }
+              updateResultsMetrics(resp.rows);
               return Deferred.fromResult(resp.rows);
             }
             return nextRows();  // Restart the call.

From bd756e7b8a285dd217d913d812a7824b1dd8ee2a Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Thu, 30 Nov 2017 17:27:29 +0900
Subject: [PATCH 07/13] collect server-side metrics when scanner is opened

---
 src/Scanner.java | 42 +++++++++++++++++++++++++++++++++++++-----
 1 file changed, 37 insertions(+), 5 deletions(-)

diff --git a/src/Scanner.java b/src/Scanner.java
index 92a16534..39cdda4c 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -815,6 +815,7 @@ public Deferred>> call(final Object arg) {
               LOG.debug("Scanner " + Bytes.hex(scanner_id) + " opened on " + region);
             }
             if (resp != null) {
+              updateServerSideMetrics(resp.metrics);
               if (resp.rows == null) {
                 return scanFinished(!resp.more);
               }
@@ -1177,7 +1178,7 @@ HBaseRpc getNextRowsRequest() {
    * Returns an RPC to open this scanner.
    */
   HBaseRpc getOpenRequest() {
-    return new OpenScannerRequest();
+    return new OpenScannerRequest().withMetricsEnabled(this.isScanMetricsEnabled());
   }
 
   /**
@@ -1187,7 +1188,7 @@ HBaseRpc getOpenRequest() {
    * @param region_start_key region's start key
    */
   HBaseRpc getOpenRequestForReverseScan(final byte[] region_start_key) {
-    return new OpenScannerRequest(table, region_start_key);
+    return new OpenScannerRequest(table, region_start_key).withMetricsEnabled(this.isScanMetricsEnabled());
   }
 
   /**
@@ -1310,6 +1311,8 @@ private ArrayList> getRows(final ScanResponse resp,
    */
   final class OpenScannerRequest extends HBaseRpc {
 
+    boolean metrics_enabled = false;
+
     /**
      * Default constructor that is used for every forward Scanner and
      * for the first Scanner in a reverse Scan
@@ -1440,6 +1443,7 @@ ChannelBuffer serialize(final byte server_version) {
         .setRegion(region.toProtobuf())
         .setScan(scan.build())
         .setNumberOfRows(max_num_rows)
+        .setTrackScanMetrics(metrics_enabled)
         // Hardcoded these parameters to false since AsyncHBase cannot support them
         .setClientHandlesHeartbeats(false)
         .setClientHandlesPartials(false)
@@ -1518,10 +1522,38 @@ Response deserialize(final ChannelBuffer buf, final int cell_size) {
         throw new InvalidResponseException("Scan RPC response doesn't contain a"
                                            + " scanner ID", resp);
       }
+      Map metrics = getServerSideScanMetrics(resp);
       final boolean scannerClosedOnServer = resp.hasMoreResultsInRegion() && !resp.getMoreResultsInRegion();
       return new Response(resp.getScannerId(),
                           getRows(resp, buf, cell_size),
-                          resp.getMoreResults(), scannerClosedOnServer);
+                          resp.getMoreResults(), scannerClosedOnServer, metrics);
+    }
+
+    public OpenScannerRequest withMetricsEnabled(boolean enabled) {
+      this.metrics_enabled = enabled;
+      return this;
+    }
+
+    private Map getServerSideScanMetrics(ScanResponse response) {
+      Map metricMap = new HashMap();
+      if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
+        return metricMap;
+      }
+
+      MapReducePB.ScanMetrics metrics = response.getScanMetrics();
+      int numberOfMetrics = metrics.getMetricsCount();
+      for (int i = 0; i < numberOfMetrics; i++) {
+        HBasePB.NameInt64Pair metricPair = metrics.getMetrics(i);
+        if (metricPair != null) {
+          String name = metricPair.getName();
+          Long value = metricPair.getValue();
+          if (name != null && value != null) {
+            metricMap.put(name, value);
+          }
+        }
+      }
+
+      return metricMap;
     }
 
     public String toString() {
@@ -1581,7 +1613,7 @@ Response deserialize(final ChannelBuffer buf, final int cell_size) {
       if (rows == null) {
         return null;
       }
-      Map metrics = getScanMetrics(resp);
+      Map metrics = getServerSideScanMetrics(resp);
       final boolean scannerClosedOnServer = resp.hasMoreResultsInRegion() && !resp.getMoreResultsInRegion();
       return new Response(resp.getScannerId(), rows, resp.getMoreResults(), scannerClosedOnServer, metrics);
     }
@@ -1591,7 +1623,7 @@ public GetNextRowsRequest withMetricsEnabled(boolean enabled) {
       return this;
     }
 
-    private Map getScanMetrics(ScanResponse response) {
+    private Map getServerSideScanMetrics(ScanResponse response) {
       Map metricMap = new HashMap();
       if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
         return metricMap;

From a6633e065fe0790d950b514b89a13176e31ad7b7 Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Thu, 30 Nov 2017 18:05:16 +0900
Subject: [PATCH 08/13] add tests for scan metrics

---
 test/TestIntegration.java | 63 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 63 insertions(+)

diff --git a/test/TestIntegration.java b/test/TestIntegration.java
index c04c9bfd..bb6bd0c4 100644
--- a/test/TestIntegration.java
+++ b/test/TestIntegration.java
@@ -2175,6 +2175,69 @@ public void reverseAndForwardScanMoreThanMaxKVs() throws Exception{
 
   }
 
+  @Test
+  public void scanMetrics() throws Exception {
+    final String table6 = args[0] + "6";
+    createOrTruncateTable(client, table6, family);
+
+    // Split into 4 regions.
+    splitTable(table6, "aaa");
+    splitTable(table6, "bbb");
+    splitTable(table6, "ccc");
+    alterTableStatus(table6);
+
+    // Put 3 rows in total, one row per each region.
+    client.setFlushInterval(FAST_FLUSH);
+    final PutRequest put1 = new PutRequest(table6, "aaa", family, "testQualifier", "testValue");
+    client.put(put1).join();
+    final PutRequest put2 = new PutRequest(table6, "bbb", family, "testQualifier", "testValue");
+    client.put(put2).join();
+    final PutRequest put3 = new PutRequest(table6, "ccc", family, "testQualifier", "testValue");
+    client.put(put3).join();
+
+    // Create forward scanner with metrics enabled.
+    Scanner scanner = client.newScanner(table6);
+    scanner.setScanMetricsEnabled(true);
+    scanner.setStartKey("aaa");
+    scanner.setStopKey("ddd");
+
+    long prevBytesInResult = 0;
+
+    ArrayList> row1 = scanner.nextRows().join();
+    assertSizeIs(1, row1);
+    Scanner.ScanMetrics metrics1 = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 1, metrics1.getCountOfRegions());
+    assertEquals("incorrect count of rows scanned", 1, metrics1.getCountOfRowsScanned()); // 1 row scanned
+    assertEquals("incorrect count of RPC calls", 1, metrics1.getCountOfRPCcalls()); // 1 open
+    assertTrue("incorrect count of bytes in results", metrics1.getCountOfBytesInResults() > prevBytesInResult);
+    prevBytesInResult = metrics1.getCountOfBytesInResults();
+
+    ArrayList> row2 = scanner.nextRows().join();
+    assertSizeIs(1, row2);
+    Scanner.ScanMetrics metrics2 = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 2, metrics2.getCountOfRegions()); // continue to next region
+    assertEquals("incorrect count of rows scanned", 2, metrics2.getCountOfRowsScanned()); // 1 row scanned
+    assertEquals("incorrect count of RPC calls", 3, metrics2.getCountOfRPCcalls()); // 1 close + 1 open
+    assertTrue("incorrect count of bytes in results", metrics2.getCountOfBytesInResults() > prevBytesInResult);
+    prevBytesInResult = metrics2.getCountOfBytesInResults();
+
+    ArrayList> row3 = scanner.nextRows().join();
+    assertSizeIs(1, row3);
+    Scanner.ScanMetrics metrics3 = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 3, metrics3.getCountOfRegions()); // continue to next region
+    assertEquals("incorrect count of rows scanned", 3, metrics3.getCountOfRowsScanned()); // 1 row scanned
+    assertEquals("incorrect count of RPC calls", 5, metrics3.getCountOfRPCcalls());  // 1 close + 1 open
+    assertTrue("incorrect count of bytes in results", metrics3.getCountOfBytesInResults() > prevBytesInResult);
+    prevBytesInResult = metrics3.getCountOfBytesInResults();
+
+    scanner.close().join();
+    Scanner.ScanMetrics metrics4 = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 3, metrics3.getCountOfRegions());
+    assertEquals("incorrect count of rows scanned", 3, metrics2.getCountOfRowsScanned());
+    assertEquals("incorrect count of RPC calls", 6, metrics3.getCountOfRPCcalls()); // 1 close
+    assertTrue("incorrect count of bytes in results", metrics3.getCountOfBytesInResults() == prevBytesInResult);
+  }
+
   /** Regression test for issue #2. */
   @Test
   public void regression2() throws Exception {

From f0e48b96de6e47383ea5bc40c978b14ca91a27e3 Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Fri, 1 Dec 2017 12:53:45 +0900
Subject: [PATCH 09/13] update test to include GetNextRowsRequest

---
 test/TestIntegration.java | 97 ++++++++++++++++++++++++---------------
 1 file changed, 60 insertions(+), 37 deletions(-)

diff --git a/test/TestIntegration.java b/test/TestIntegration.java
index bb6bd0c4..df58932a 100644
--- a/test/TestIntegration.java
+++ b/test/TestIntegration.java
@@ -33,7 +33,6 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
@@ -2186,12 +2185,17 @@ public void scanMetrics() throws Exception {
     splitTable(table6, "ccc");
     alterTableStatus(table6);
 
-    // Put 3 rows in total, one row per each region.
+    // Put 5 rows to 3 regions.
     client.setFlushInterval(FAST_FLUSH);
-    final PutRequest put1 = new PutRequest(table6, "aaa", family, "testQualifier", "testValue");
-    client.put(put1).join();
-    final PutRequest put2 = new PutRequest(table6, "bbb", family, "testQualifier", "testValue");
-    client.put(put2).join();
+    final PutRequest put_a1 = new PutRequest(table6, "aaa1", family, "testQualifier", "testValue");
+    client.put(put_a1).join();
+    client.setFlushInterval(FAST_FLUSH);
+    final PutRequest put_a2 = new PutRequest(table6, "aaa2", family, "testQualifier", "testValue");
+    client.put(put_a2).join();
+    final PutRequest put_b1 = new PutRequest(table6, "bbb1", family, "testQualifier", "testValue");
+    client.put(put_b1).join();
+    final PutRequest put_b2 = new PutRequest(table6, "bbb2", family, "testQualifier", "testValue");
+    client.put(put_b2).join();
     final PutRequest put3 = new PutRequest(table6, "ccc", family, "testQualifier", "testValue");
     client.put(put3).join();
 
@@ -2203,39 +2207,58 @@ public void scanMetrics() throws Exception {
 
     long prevBytesInResult = 0;
 
-    ArrayList> row1 = scanner.nextRows().join();
-    assertSizeIs(1, row1);
-    Scanner.ScanMetrics metrics1 = scanner.getScanMetrics();
-    assertEquals("incorrect count of regions", 1, metrics1.getCountOfRegions());
-    assertEquals("incorrect count of rows scanned", 1, metrics1.getCountOfRowsScanned()); // 1 row scanned
-    assertEquals("incorrect count of RPC calls", 1, metrics1.getCountOfRPCcalls()); // 1 open
-    assertTrue("incorrect count of bytes in results", metrics1.getCountOfBytesInResults() > prevBytesInResult);
-    prevBytesInResult = metrics1.getCountOfBytesInResults();
-
-    ArrayList> row2 = scanner.nextRows().join();
-    assertSizeIs(1, row2);
-    Scanner.ScanMetrics metrics2 = scanner.getScanMetrics();
-    assertEquals("incorrect count of regions", 2, metrics2.getCountOfRegions()); // continue to next region
-    assertEquals("incorrect count of rows scanned", 2, metrics2.getCountOfRowsScanned()); // 1 row scanned
-    assertEquals("incorrect count of RPC calls", 3, metrics2.getCountOfRPCcalls()); // 1 close + 1 open
-    assertTrue("incorrect count of bytes in results", metrics2.getCountOfBytesInResults() > prevBytesInResult);
-    prevBytesInResult = metrics2.getCountOfBytesInResults();
-
-    ArrayList> row3 = scanner.nextRows().join();
-    assertSizeIs(1, row3);
-    Scanner.ScanMetrics metrics3 = scanner.getScanMetrics();
-    assertEquals("incorrect count of regions", 3, metrics3.getCountOfRegions()); // continue to next region
-    assertEquals("incorrect count of rows scanned", 3, metrics3.getCountOfRowsScanned()); // 1 row scanned
-    assertEquals("incorrect count of RPC calls", 5, metrics3.getCountOfRPCcalls());  // 1 close + 1 open
-    assertTrue("incorrect count of bytes in results", metrics3.getCountOfBytesInResults() > prevBytesInResult);
-    prevBytesInResult = metrics3.getCountOfBytesInResults();
+    ArrayList> row_a1 = scanner.nextRows(1).join();
+    assertSizeIs(1, row_a1);
+    Scanner.ScanMetrics metrics_a1 = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 1, metrics_a1.getCountOfRegions());
+    assertEquals("incorrect count of rows scanned", 1, metrics_a1.getCountOfRowsScanned()); // + 1 row scanned
+    assertEquals("incorrect count of RPC calls", 1, metrics_a1.getCountOfRPCcalls()); // + 1 open
+    assertTrue("incorrect count of bytes in results", metrics_a1.getCountOfBytesInResults() > prevBytesInResult);
+    assertEquals("incorrect count of NotServingRegionException", 0, metrics_a1.getCountOfNSRE());
+    prevBytesInResult = metrics_a1.getCountOfBytesInResults();
+
+    ArrayList> row_a2 = scanner.nextRows(1).join();
+    assertSizeIs(1, row_a2);
+    Scanner.ScanMetrics metrics_a2 = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 1, metrics_a2.getCountOfRegions());
+    assertEquals("incorrect count of rows scanned", 2, metrics_a2.getCountOfRowsScanned()); // + 1 row scanned
+    assertEquals("incorrect count of RPC calls", 2, metrics_a2.getCountOfRPCcalls()); // + 1 next
+    assertTrue("incorrect count of bytes in results", metrics_a2.getCountOfBytesInResults() > prevBytesInResult);
+    prevBytesInResult = metrics_a2.getCountOfBytesInResults();
+
+    ArrayList> row_b1 = scanner.nextRows(1).join();
+    assertSizeIs(1, row_b1);
+    Scanner.ScanMetrics metrics_b1 = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 2, metrics_b1.getCountOfRegions()); // + next region
+    assertEquals("incorrect count of rows scanned", 3, metrics_b1.getCountOfRowsScanned()); // + 1 row scanned
+    assertEquals("incorrect count of RPC calls", 5, metrics_b1.getCountOfRPCcalls()); // + 1 empty next + 1 close + 1 open
+    assertTrue("incorrect count of bytes in results", metrics_b1.getCountOfBytesInResults() > prevBytesInResult);
+    prevBytesInResult = metrics_b1.getCountOfBytesInResults();
+
+    ArrayList> row_b2 = scanner.nextRows(1).join();
+    assertSizeIs(1, row_b2);
+    Scanner.ScanMetrics metrics_b2 = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 2, metrics_b2.getCountOfRegions());
+    assertEquals("incorrect count of rows scanned", 4, metrics_b2.getCountOfRowsScanned()); // + 1 row scanned
+    assertEquals("incorrect count of RPC calls", 6, metrics_b2.getCountOfRPCcalls()); // + 1 next
+    assertTrue("incorrect count of bytes in results", metrics_b2.getCountOfBytesInResults() > prevBytesInResult);
+    prevBytesInResult = metrics_b2.getCountOfBytesInResults();
+
+    ArrayList> row_c = scanner.nextRows().join();
+    assertSizeIs(1, row_c);
+    Scanner.ScanMetrics metrics_c = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 3, metrics_c.getCountOfRegions()); // + next region
+    assertEquals("incorrect count of rows scanned", 5, metrics_c.getCountOfRowsScanned()); // + 1 row scanned
+    assertEquals("incorrect count of RPC calls", 9, metrics_c.getCountOfRPCcalls()); // + 1 empty next + 1 close + 1 open
+    assertTrue("incorrect count of bytes in results", metrics_c.getCountOfBytesInResults() > prevBytesInResult);
+    prevBytesInResult = metrics_c.getCountOfBytesInResults();
 
     scanner.close().join();
-    Scanner.ScanMetrics metrics4 = scanner.getScanMetrics();
-    assertEquals("incorrect count of regions", 3, metrics3.getCountOfRegions());
-    assertEquals("incorrect count of rows scanned", 3, metrics2.getCountOfRowsScanned());
-    assertEquals("incorrect count of RPC calls", 6, metrics3.getCountOfRPCcalls()); // 1 close
-    assertTrue("incorrect count of bytes in results", metrics3.getCountOfBytesInResults() == prevBytesInResult);
+    Scanner.ScanMetrics metrics_final = scanner.getScanMetrics();
+    assertEquals("incorrect count of regions", 3, metrics_final.getCountOfRegions());
+    assertEquals("incorrect count of rows scanned", 5, metrics_final.getCountOfRowsScanned());
+    assertEquals("incorrect count of RPC calls", 10, metrics_final.getCountOfRPCcalls()); // + 1 close
+    assertTrue("incorrect count of bytes in results", metrics_final.getCountOfBytesInResults() == prevBytesInResult);
   }
 
   /** Regression test for issue #2. */

From 1dbe27cd6db9805f48c0bfe50a9ec4433d1385f3 Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Fri, 1 Dec 2017 14:41:48 +0900
Subject: [PATCH 10/13] add tests of scan metrics of filtered rows

---
 test/TestIntegration.java | 36 ++++++++++++++++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/test/TestIntegration.java b/test/TestIntegration.java
index df58932a..7216aaad 100644
--- a/test/TestIntegration.java
+++ b/test/TestIntegration.java
@@ -2174,6 +2174,7 @@ public void reverseAndForwardScanMoreThanMaxKVs() throws Exception{
 
   }
 
+  /** Scan metrics tests.  */
   @Test
   public void scanMetrics() throws Exception {
     final String table6 = args[0] + "6";
@@ -2261,6 +2262,41 @@ public void scanMetrics() throws Exception {
     assertTrue("incorrect count of bytes in results", metrics_final.getCountOfBytesInResults() == prevBytesInResult);
   }
 
+  /** Scan metrics of filtered rows tests. */
+  @Test
+  public void scanMetricsFilter() throws Exception {
+    client.setFlushInterval(FAST_FLUSH);
+    // Keep only rows with a column qualifier that starts with "qa".
+    final PutRequest put1 = new PutRequest(table, "cpf1", family, "qa1", "v1");
+    final PutRequest put2 = new PutRequest(table, "cpf2", family, "qb2", "v2");
+    final PutRequest put3 = new PutRequest(table, "cpf3", family, "qb3", "v3");
+    final PutRequest put4 = new PutRequest(table, "cpf4", family, "qa4", "v4");
+    Deferred.group(Deferred.group(client.put(put1), client.put(put2)),
+        Deferred.group(client.put(put3), client.put(put4))).join();
+    final Scanner scanner = client.newScanner(table);
+    scanner.setFamily(family);
+    scanner.setStartKey("cpf1");
+    scanner.setStopKey("cpf5");
+    scanner.setFilter(new ColumnPrefixFilter("qa"));
+    scanner.setScanMetricsEnabled(true);
+    final ArrayList> rows = scanner.nextRows().join();
+
+    assertSizeIs(2, rows);
+    ArrayList kvs1 = rows.get(0);
+    assertSizeIs(1, kvs1);
+    assertEq("v1", kvs1.get(0).value());
+    ArrayList kvs4 = rows.get(1);
+    assertSizeIs(1, kvs4);
+    assertEq("v4", kvs4.get(0).value());
+
+    Scanner.ScanMetrics metrics = scanner.getScanMetrics();
+    assertEquals("incorrect count of rows scanned", 4, metrics.getCountOfRowsScanned());
+    assertEquals("incorrect count of rows filtered", 2, metrics.getCountOfRowsFiltered());
+    assertEquals("incorrect count of RPC calls", 1, metrics.getCountOfRPCcalls()); // 1 open
+
+    scanner.close().join();
+  }
+
   /** Regression test for issue #2. */
   @Test
   public void regression2() throws Exception {

From 6bcce6e27ca38554e3700550252ca592c8046fb1 Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Fri, 1 Dec 2017 17:36:04 +0900
Subject: [PATCH 11/13] delete countOfRPCRetries

---
 src/Scanner.java          | 36 +++++++++++++++++-------------------
 test/TestIntegration.java |  3 +--
 2 files changed, 18 insertions(+), 21 deletions(-)

diff --git a/src/Scanner.java b/src/Scanner.java
index 39cdda4c..2e1dbb6d 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -1764,6 +1764,10 @@ private void updateSumOfMillisSecBetweenNexts(long currentTime) {
     }
   }
 
+  /**
+   * Server-side metrics.
+   * Only supported for HBase 0.95 and above.
+   */
   private static class ServerSideScanMetrics {
 
     public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
@@ -1773,18 +1777,24 @@ private static class ServerSideScanMetrics {
     protected long count_of_rows_filtered = 0;
 
     /**
-     * number of rows scanned during scan RPC. Not every row scanned will be returned to the client
+     * Number of rows scanned during scan RPC.
+     * Not every row scanned will be returned to the client
      * since rows may be filtered.
+     * Always returns 0 if HBase < 0.95.
      */
     public long getCountOfRowsScanned() { return count_of_rows_scanned; };
 
     /**
-     * number of rows filtered during scan RPC
+     * Number of rows filtered during scan RPC.
+     * Always returns 0 if HBase < 0.95.
      */
     public long getCountOfRowsFiltered() { return count_of_rows_filtered; }
 
   }
 
+  /**
+   * Client-side metrics.
+   */
   public static class ScanMetrics extends ServerSideScanMetrics {
 
     public static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS";
@@ -1807,42 +1817,31 @@ public static class ScanMetrics extends ServerSideScanMetrics {
      */
     private long count_of_regions = 1;
 
-    private long count_of_rpc_retries = 0;
-
     /**
-     * number of RPC calls
+     * Number of RPC calls.
      */
     public long getCountOfRPCcalls() { return count_of_rpc_calls; }
 
     /**
-     * sum of milliseconds between sequential next calls
+     * Sum of milliseconds between sequential next calls.
      */
     public long getSumOfMillisSecBetweenNexts() { return sum_of_millis_sec_between_nexts; }
 
     /**
-     * number of NotServingRegionException caught
+     * Number of NotServingRegionException caught.
      */
     public long getCountOfNSRE() { return count_of_nsre; }
 
     /**
-     * number of bytes in Result objects from region servers
+     * Number of bytes in Result objects from region servers.
      */
     public long getCountOfBytesInResults() { return count_of_bytes_in_results; }
 
     /**
-     * number of regions
-     * Starts with 1 because it is incremented when a scanner switches to a next region.
+     * Number of regions.
      */
     public long getCountOfRegions() { return count_of_regions; };
 
-    /**
-     * number of RPC retries
-     */
-    public long getCountOfRPCRetries() { return count_of_rpc_retries; }
-
-    /**
-     * constructor
-     */
     public ScanMetrics() {
     }
 
@@ -1855,7 +1854,6 @@ public Map getMetricsMap() {
       builder.put(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME, count_of_nsre);
       builder.put(BYTES_IN_RESULTS_METRIC_NAME, count_of_bytes_in_results);
       builder.put(REGIONS_SCANNED_METRIC_NAME, count_of_regions);
-      builder.put(RPC_RETRIES_METRIC_NAME, count_of_rpc_retries);
       return builder.build();
     }
   }
diff --git a/test/TestIntegration.java b/test/TestIntegration.java
index 7216aaad..a2d4266d 100644
--- a/test/TestIntegration.java
+++ b/test/TestIntegration.java
@@ -2215,7 +2215,6 @@ public void scanMetrics() throws Exception {
     assertEquals("incorrect count of rows scanned", 1, metrics_a1.getCountOfRowsScanned()); // + 1 row scanned
     assertEquals("incorrect count of RPC calls", 1, metrics_a1.getCountOfRPCcalls()); // + 1 open
     assertTrue("incorrect count of bytes in results", metrics_a1.getCountOfBytesInResults() > prevBytesInResult);
-    assertEquals("incorrect count of NotServingRegionException", 0, metrics_a1.getCountOfNSRE());
     prevBytesInResult = metrics_a1.getCountOfBytesInResults();
 
     ArrayList> row_a2 = scanner.nextRows(1).join();
@@ -2264,7 +2263,7 @@ public void scanMetrics() throws Exception {
 
   /** Scan metrics of filtered rows tests. */
   @Test
-  public void scanMetricsFilter() throws Exception {
+  public void scanMetricsWithFilter() throws Exception {
     client.setFlushInterval(FAST_FLUSH);
     // Keep only rows with a column qualifier that starts with "qa".
     final PutRequest put1 = new PutRequest(table, "cpf1", family, "qa1", "v1");

From f09ed7795f5f6874120a49d5c9f98449759ecd7c Mon Sep 17 00:00:00 2001
From: Yusuke Yasuda 
Date: Fri, 1 Dec 2017 18:24:48 +0900
Subject: [PATCH 12/13] skip server-side metrics check for HBase < 0.95

---
 src/Scanner.java          |  8 +++++-
 test/TestIntegration.java | 51 +++++++++++++++++++++++++++++++++------
 2 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/src/Scanner.java b/src/Scanner.java
index 2e1dbb6d..0c87d425 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -1794,6 +1794,13 @@ private static class ServerSideScanMetrics {
 
   /**
    * Client-side metrics.
+   * 

+ * This class is immutable. You can get updated values by calling + * this function again once RPC is completed. + * This class is not synchronized because + * fields of this class is updated only inside the Deferred callbacks + * by {@code Scanner}. + *

*/ public static class ScanMetrics extends ServerSideScanMetrics { @@ -1802,7 +1809,6 @@ public static class ScanMetrics extends ServerSideScanMetrics { public static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION"; public static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS"; public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED"; - public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES"; private long count_of_rpc_calls = 0; diff --git a/test/TestIntegration.java b/test/TestIntegration.java index a2d4266d..9219de24 100644 --- a/test/TestIntegration.java +++ b/test/TestIntegration.java @@ -2212,7 +2212,12 @@ public void scanMetrics() throws Exception { assertSizeIs(1, row_a1); Scanner.ScanMetrics metrics_a1 = scanner.getScanMetrics(); assertEquals("incorrect count of regions", 1, metrics_a1.getCountOfRegions()); - assertEquals("incorrect count of rows scanned", 1, metrics_a1.getCountOfRowsScanned()); // + 1 row scanned + // server-side metrics works for HBase > 0.94 only + if (metrics_a1.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 1, metrics_a1.getCountOfRowsScanned()); // + 1 row scanned + } assertEquals("incorrect count of RPC calls", 1, metrics_a1.getCountOfRPCcalls()); // + 1 open assertTrue("incorrect count of bytes in results", metrics_a1.getCountOfBytesInResults() > prevBytesInResult); prevBytesInResult = metrics_a1.getCountOfBytesInResults(); @@ -2221,7 +2226,12 @@ public void scanMetrics() throws Exception { assertSizeIs(1, row_a2); Scanner.ScanMetrics metrics_a2 = scanner.getScanMetrics(); assertEquals("incorrect count of regions", 1, metrics_a2.getCountOfRegions()); - assertEquals("incorrect count of rows scanned", 2, metrics_a2.getCountOfRowsScanned()); // + 1 row scanned + // server-side metrics works for HBase > 0.94 only + if (metrics_a2.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 2, metrics_a2.getCountOfRowsScanned()); // + 1 row scanned + } assertEquals("incorrect count of RPC calls", 2, metrics_a2.getCountOfRPCcalls()); // + 1 next assertTrue("incorrect count of bytes in results", metrics_a2.getCountOfBytesInResults() > prevBytesInResult); prevBytesInResult = metrics_a2.getCountOfBytesInResults(); @@ -2230,7 +2240,12 @@ public void scanMetrics() throws Exception { assertSizeIs(1, row_b1); Scanner.ScanMetrics metrics_b1 = scanner.getScanMetrics(); assertEquals("incorrect count of regions", 2, metrics_b1.getCountOfRegions()); // + next region - assertEquals("incorrect count of rows scanned", 3, metrics_b1.getCountOfRowsScanned()); // + 1 row scanned + // server-side metrics works for HBase > 0.94 only + if (metrics_b1.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 3, metrics_b1.getCountOfRowsScanned()); // + 1 row scanned + } assertEquals("incorrect count of RPC calls", 5, metrics_b1.getCountOfRPCcalls()); // + 1 empty next + 1 close + 1 open assertTrue("incorrect count of bytes in results", metrics_b1.getCountOfBytesInResults() > prevBytesInResult); prevBytesInResult = metrics_b1.getCountOfBytesInResults(); @@ -2239,7 +2254,12 @@ public void scanMetrics() throws Exception { assertSizeIs(1, row_b2); Scanner.ScanMetrics metrics_b2 = scanner.getScanMetrics(); assertEquals("incorrect count of regions", 2, metrics_b2.getCountOfRegions()); - assertEquals("incorrect count of rows scanned", 4, metrics_b2.getCountOfRowsScanned()); // + 1 row scanned + // server-side metrics works for HBase > 0.94 only + if (metrics_b2.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 4, metrics_b2.getCountOfRowsScanned()); // + 1 row scanned + } assertEquals("incorrect count of RPC calls", 6, metrics_b2.getCountOfRPCcalls()); // + 1 next assertTrue("incorrect count of bytes in results", metrics_b2.getCountOfBytesInResults() > prevBytesInResult); prevBytesInResult = metrics_b2.getCountOfBytesInResults(); @@ -2248,7 +2268,12 @@ public void scanMetrics() throws Exception { assertSizeIs(1, row_c); Scanner.ScanMetrics metrics_c = scanner.getScanMetrics(); assertEquals("incorrect count of regions", 3, metrics_c.getCountOfRegions()); // + next region - assertEquals("incorrect count of rows scanned", 5, metrics_c.getCountOfRowsScanned()); // + 1 row scanned + // server-side metrics works for HBase > 0.94 only + if (metrics_c.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 5, metrics_c.getCountOfRowsScanned()); // + 1 row scanned + } assertEquals("incorrect count of RPC calls", 9, metrics_c.getCountOfRPCcalls()); // + 1 empty next + 1 close + 1 open assertTrue("incorrect count of bytes in results", metrics_c.getCountOfBytesInResults() > prevBytesInResult); prevBytesInResult = metrics_c.getCountOfBytesInResults(); @@ -2256,7 +2281,12 @@ public void scanMetrics() throws Exception { scanner.close().join(); Scanner.ScanMetrics metrics_final = scanner.getScanMetrics(); assertEquals("incorrect count of regions", 3, metrics_final.getCountOfRegions()); - assertEquals("incorrect count of rows scanned", 5, metrics_final.getCountOfRowsScanned()); + // server-side metrics works for HBase > 0.94 only + if (metrics_final.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 5, metrics_final.getCountOfRowsScanned()); + } assertEquals("incorrect count of RPC calls", 10, metrics_final.getCountOfRPCcalls()); // + 1 close assertTrue("incorrect count of bytes in results", metrics_final.getCountOfBytesInResults() == prevBytesInResult); } @@ -2289,8 +2319,13 @@ public void scanMetricsWithFilter() throws Exception { assertEq("v4", kvs4.get(0).value()); Scanner.ScanMetrics metrics = scanner.getScanMetrics(); - assertEquals("incorrect count of rows scanned", 4, metrics.getCountOfRowsScanned()); - assertEquals("incorrect count of rows filtered", 2, metrics.getCountOfRowsFiltered()); + // server-side metrics works for HBase > 0.94 only + if (metrics.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 4, metrics.getCountOfRowsScanned()); + assertEquals("incorrect count of rows filtered", 2, metrics.getCountOfRowsFiltered()); + } assertEquals("incorrect count of RPC calls", 1, metrics.getCountOfRPCcalls()); // 1 open scanner.close().join(); From 72a18b90a91cb90c8164e1acc30c6f4977c2c94f Mon Sep 17 00:00:00 2001 From: Yusuke Yasuda Date: Wed, 28 Feb 2018 12:31:38 +0900 Subject: [PATCH 13/13] add RPC_RETRIES metrics --- src/Scanner.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/Scanner.java b/src/Scanner.java index 0c87d425..7585afde 100644 --- a/src/Scanner.java +++ b/src/Scanner.java @@ -884,6 +884,7 @@ public Object call(final Object error) { invalidate(); // If there was an error, don't assume we're still OK. if (error instanceof NotServingRegionException) { incCountOfNSRE(); + incCountOfRPCRetries(); // We'll resume scanning on another region, and we want to pick up // right after the last key we successfully returned. Padding the // last key with an extra 0 gives us the next possible key. @@ -908,6 +909,7 @@ public Object call(final Object error) { + " been holding the scanner open and idle for too long (possibly" + " due to a long GC pause on your side or in the RegionServer)", error); + incCountOfRPCRetries(); // Let's re-open ourselves and keep scanning. return nextRows(); // XXX dangerous endless retry } @@ -1751,6 +1753,12 @@ private void incCountOfNSRE() { } } + private void incCountOfRPCRetries() { + if (isScanMetricsEnabled()) { + this.scanMetrics.count_of_rpc_retries += 1; + } + } + private void incCountOfRegions() { if (isScanMetricsEnabled()) { this.scanMetrics.count_of_regions += 1; @@ -1809,6 +1817,7 @@ public static class ScanMetrics extends ServerSideScanMetrics { public static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION"; public static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS"; public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED"; + public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES"; private long count_of_rpc_calls = 0; @@ -1816,7 +1825,9 @@ public static class ScanMetrics extends ServerSideScanMetrics { private long count_of_nsre = 0; - private long count_of_bytes_in_results = 0; + private long count_of_bytes_in_results = 0; + + private long count_of_rpc_retries = 0; /** * Starts with 1 because it is incremented when a scanner switches to a next region. @@ -1848,6 +1859,11 @@ public static class ScanMetrics extends ServerSideScanMetrics { */ public long getCountOfRegions() { return count_of_regions; }; + /** + * number of RPC retries + */ + public long getCountOfRPCRetries() { return count_of_rpc_retries; } + public ScanMetrics() { } @@ -1860,6 +1876,7 @@ public Map getMetricsMap() { builder.put(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME, count_of_nsre); builder.put(BYTES_IN_RESULTS_METRIC_NAME, count_of_bytes_in_results); builder.put(REGIONS_SCANNED_METRIC_NAME, count_of_regions); + builder.put(RPC_RETRIES_METRIC_NAME, count_of_rpc_retries); return builder.build(); } }