From da9f6bf81877293df2af9d564d7ee51d088c087d Mon Sep 17 00:00:00 2001 From: Karthik Palanisamy Date: Sat, 23 Nov 2019 07:13:47 -0800 Subject: [PATCH] HBASE-23237 Prevent Negative values in metrics requestsPerSecond (#866) Signed-off-by: Guangxu Cheng Signed-off-by: Josh Elser --- .../hbase/regionserver/HRegionServer.java | 6 +- .../MetricsRegionServerWrapperImpl.java | 58 ++++++++---- .../TestRequestsPerSecondMetric.java | 92 +++++++++++++++++++ 3 files changed, 138 insertions(+), 18 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRequestsPerSecondMetric.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2ba44292607..86245841a11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -369,6 +369,7 @@ public class HRegionServer extends HasThread implements public static final String REGIONSERVER = "regionserver"; MetricsRegionServer metricsRegionServer; + MetricsRegionServerWrapperImpl metricsRegionServerImpl; MetricsTable metricsTable; private SpanReceiverHost spanReceiverHost; @@ -1496,8 +1497,8 @@ public class HRegionServer extends HasThread implements this.cacheConfig = new CacheConfig(conf); this.walFactory = setupWALAndReplication(); // Init in here rather than in constructor after thread name has been set - this.metricsRegionServer = new MetricsRegionServer( - new MetricsRegionServerWrapperImpl(this), conf); + this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); + this.metricsRegionServer = new MetricsRegionServer(metricsRegionServerImpl, conf); this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); // Now that we have a metrics source, start the pause monitor this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); @@ -3114,6 +3115,7 @@ public class HRegionServer extends HasThread implements @Override public boolean removeFromOnlineRegions(final Region r, ServerName destination) { Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); + metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); if (destination != null) { long closeSeqNum = r.getMaxFlushedSeqId(); if (closeSeqNum == HConstants.NO_SEQNUM) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 061267d8d32..948c255df28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -84,6 +87,8 @@ class MetricsRegionServerWrapperImpl private volatile long majorCompactedCellsSize = 0; private volatile long blockedRequestsCount = 0L; private volatile long averageRegionSize = 0L; + protected final Map> + requestsCountCache = new ConcurrentHashMap>(); private CacheStats cacheStats; private CacheStats l1Stats = null; @@ -586,9 +591,6 @@ class MetricsRegionServerWrapperImpl public class RegionServerMetricsWrapperRunnable implements Runnable { private long lastRan = 0; - private long lastRequestCount = 0; - private long lastReadRequestsCount = 0; - private long lastWriteRequestsCount = 0; @Override synchronized public void run() { @@ -628,7 +630,40 @@ class MetricsRegionServerWrapperImpl long tempBlockedRequestsCount = 0L; int regionCount = 0; - for (Region r : regionServer.getOnlineRegionsLocalContext()) { + + long currentReadRequestsCount = 0; + long currentWriteRequestsCount = 0; + long lastReadRequestsCount = 0; + long lastWriteRequestsCount = 0; + long readRequestsDelta = 0; + long writeRequestsDelta = 0; + long totalReadRequestsDelta = 0; + long totalWriteRequestsDelta = 0; + String encodedRegionName; + for (Region r : regionServer.getOnlineRegionsLocalContext()) { + encodedRegionName = r.getRegionInfo().getEncodedName(); + currentReadRequestsCount = r.getReadRequestsCount(); + currentWriteRequestsCount = r.getWriteRequestsCount(); + if (requestsCountCache.containsKey(encodedRegionName)) { + lastReadRequestsCount = requestsCountCache.get(encodedRegionName).get(0); + lastWriteRequestsCount = requestsCountCache.get(encodedRegionName).get(1); + readRequestsDelta = currentReadRequestsCount - lastReadRequestsCount; + writeRequestsDelta = currentWriteRequestsCount - lastWriteRequestsCount; + totalReadRequestsDelta += readRequestsDelta; + totalWriteRequestsDelta += writeRequestsDelta; + //Update cache for our next comparision + requestsCountCache.get(encodedRegionName).set(0,currentReadRequestsCount); + requestsCountCache.get(encodedRegionName).set(1,currentWriteRequestsCount); + } else { + // List[0] -> readRequestCount + // List[1] -> writeRequestCount + ArrayList requests = new ArrayList(2); + requests.add(currentReadRequestsCount); + requests.add(currentWriteRequestsCount); + requestsCountCache.put(encodedRegionName, requests); + totalReadRequestsDelta += currentReadRequestsCount; + totalWriteRequestsDelta += currentWriteRequestsCount; + } tempNumMutationsWithoutWAL += r.getNumMutationsWithoutWAL(); tempDataInMemoryWithoutWAL += r.getDataInMemoryWithoutWAL(); tempReadRequestsCount += r.getReadRequestsCount(); @@ -697,25 +732,16 @@ class MetricsRegionServerWrapperImpl // If we've time traveled keep the last requests per second. if ((currentTime - lastRan) > 0) { - long currentRequestCount = getTotalRowActionRequestCount(); - requestsPerSecond = (currentRequestCount - lastRequestCount) / + requestsPerSecond = (totalReadRequestsDelta + totalWriteRequestsDelta) / ((currentTime - lastRan) / 1000.0); - lastRequestCount = currentRequestCount; - long intervalReadRequestsCount = tempReadRequestsCount - lastReadRequestsCount; - long intervalWriteRequestsCount = tempWriteRequestsCount - lastWriteRequestsCount; - - double readRequestsRatePerMilliSecond = ((double)intervalReadRequestsCount/ + double readRequestsRatePerMilliSecond = ((double)totalReadRequestsDelta/ (double)period); - double writeRequestsRatePerMilliSecond = ((double)intervalWriteRequestsCount/ + double writeRequestsRatePerMilliSecond = ((double)totalWriteRequestsDelta/ (double)period); readRequestsRate = readRequestsRatePerMilliSecond * 1000.0; writeRequestsRate = writeRequestsRatePerMilliSecond * 1000.0; - - lastReadRequestsCount = tempReadRequestsCount; - lastWriteRequestsCount = tempWriteRequestsCount; - } lastRan = currentTime; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRequestsPerSecondMetric.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRequestsPerSecondMetric.java new file mode 100644 index 00000000000..6f27fd786fb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRequestsPerSecondMetric.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Validate requestsPerSecond metric. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestRequestsPerSecondMetric { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final long METRICS_PERIOD = 2000L; + private static Configuration conf; + + + @BeforeClass + public static void setup() throws Exception { + conf = UTIL.getConfiguration(); + conf.setLong(HConstants.REGIONSERVER_METRICS_PERIOD, METRICS_PERIOD); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void teardown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + + @Test + /** + * This test will confirm no negative value in requestsPerSecond metric during any region + * transition(close region/remove region/move region). + * Firstly, load 2000 random rows for 25 regions and will trigger a metric. + * Now, metricCache will have a current read and write requests count. + * Next, we disable a table and all of its 25 regions will be closed. + * As part of region close, his metric will also be removed from metricCache. + * prior to HBASE-23237, we do not remove/reset his metric so we incorrectly compute + * (currentRequestCount - lastRequestCount) which result into negative value. + * + * @throws IOException + * @throws InterruptedException + */ + public void testNoNegativeSignAtRequestsPerSecond() throws IOException, InterruptedException { + final TableName TABLENAME = TableName.valueOf("t"); + final String FAMILY = "f"; + Admin admin = UTIL.getHBaseAdmin(); + UTIL.createMultiRegionTable(TABLENAME, FAMILY.getBytes(),25); + Table table = admin.getConnection().getTable(TABLENAME); + HRegionServer regionServer = UTIL.getMiniHBaseCluster().getRegionServer(0); + MetricsRegionServerWrapperImpl metricsWrapper = + new MetricsRegionServerWrapperImpl(regionServer); + MetricsRegionServerWrapperImpl.RegionServerMetricsWrapperRunnable metricsServer + = metricsWrapper.new RegionServerMetricsWrapperRunnable(); + metricsServer.run(); + UTIL.loadNumericRows(table, FAMILY.getBytes(), 1, 2000); + Thread.sleep(METRICS_PERIOD); + metricsServer.run(); + admin.disableTable(TABLENAME); + Thread.sleep(METRICS_PERIOD); + metricsServer.run(); + Assert.assertTrue(metricsWrapper.getRequestsPerSecond() > -1); + } +}