From 19d094b96ffc322bc5b404f974404e6488b066e9 Mon Sep 17 00:00:00 2001 From: bsglz <18031031@qq.com> Date: Fri, 19 Mar 2021 03:13:06 +0800 Subject: [PATCH] =?UTF-8?q?HBASE-25643=20The=20delayed=20FlushRegionEntry?= =?UTF-8?q?=20should=20be=20removed=20when=20we=20ne=E2=80=A6=20(#3049)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: AnoopSamJohn Signed-off-by: stack --- .../hadoop/hbase/regionserver/HRegion.java | 4 + .../hbase/regionserver/MemStoreFlusher.java | 67 ++++++++----- .../regionserver/TestMemStoreFlusher.java | 98 +++++++++++++++++++ 3 files changed, 145 insertions(+), 24 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 09cc6482732..5bfeff72ad9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -8380,6 +8380,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi flushesQueued.increment(); } + protected void decrementFlushesQueuedCount() { + flushesQueued.decrement(); + } + /** * If a handler thread is eligible for interrupt, make it ineligible. Should be paired * with {{@link #enableInterrupts()}. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 1f6a3507e94..a35a0f120c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -72,7 +72,7 @@ class MemStoreFlusher implements FlushRequester { // These two data members go together. Any entry in the one must have // a corresponding entry in the other. private final BlockingQueue flushQueue = new DelayQueue<>(); - private final Map regionsInQueue = new HashMap<>(); + protected final Map regionsInQueue = new HashMap<>(); private AtomicBoolean wakeupPending = new AtomicBoolean(); private final long threadWakeFrequency; @@ -126,20 +126,22 @@ class MemStoreFlusher implements FlushRequester { this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); - if (handlerCount < 1) { - LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, corrected to 1", - handlerCount); - handlerCount = 1; + if (server != null) { + if (handlerCount < 1) { + LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, " + + "corrected to 1", handlerCount); + handlerCount = 1; + } + LOG.info("globalMemStoreLimit=" + + TraditionalBinaryPrefix + .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) + + ", globalMemStoreLimitLowMark=" + + TraditionalBinaryPrefix.long2String( + this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) + + ", Offheap=" + + (this.server.getRegionServerAccounting().isOffheap())); } this.flushHandlers = new FlushHandler[handlerCount]; - LOG.info("globalMemStoreLimit=" - + TraditionalBinaryPrefix - .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) - + ", globalMemStoreLimitLowMark=" - + TraditionalBinaryPrefix.long2String( - this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) - + ", Offheap=" - + (this.server.getRegionServerAccounting().isOffheap())); } public LongAdder getUpdatesBlockedMsHighWater() { @@ -462,18 +464,28 @@ class MemStoreFlusher implements FlushRequester { public boolean requestFlush(HRegion r, List families, FlushLifeCycleTracker tracker) { synchronized (regionsInQueue) { - if (!regionsInQueue.containsKey(r)) { - // This entry has no delay so it will be added at the top of the flush - // queue. It'll come out near immediately. - FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); - this.regionsInQueue.put(r, fqe); - this.flushQueue.add(fqe); - r.incrementFlushesQueuedCount(); - return true; - } else { - tracker.notExecuted("Flush already requested on " + r); - return false; + FlushRegionEntry existFqe = regionsInQueue.get(r); + if (existFqe != null) { + // if a delayed one exists and not reach the time to execute, just remove it + if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) { + LOG.info("Remove the existing delayed flush entry for {}, " + + "because we need to flush it immediately", r); + this.regionsInQueue.remove(r); + this.flushQueue.remove(existFqe); + r.decrementFlushesQueuedCount(); + } else { + tracker.notExecuted("Flush already requested on " + r); + return false; + } } + + // This entry has no delay so it will be added at the top of the flush + // queue. It'll come out near immediately. + FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); + this.regionsInQueue.put(r, fqe); + this.flushQueue.add(fqe); + r.incrementFlushesQueuedCount(); + return true; } } @@ -868,6 +880,13 @@ class MemStoreFlusher implements FlushRequester { return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; } + /** + * @return True if the entry is a delay flush task + */ + protected boolean isDelay() { + return this.whenToExpire > this.createTime; + } + /** * @return Count of times {@link #requeue(long)} was called; i.e this is * number of times we've been requeued. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java new file mode 100644 index 00000000000..bc3df0ab805 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestMemStoreFlusher { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMemStoreFlusher.class); + + @Rule + public TestName name = new TestName(); + + public MemStoreFlusher msf; + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + conf.set("hbase.hstore.flusher.count", "0"); + msf = new MemStoreFlusher(conf, null); + } + + @Test + public void testReplaceDelayedFlushEntry() { + RegionInfo hri = + RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setRegionId(1) + .setReplicaId(0).build(); + HRegion r = mock(HRegion.class); + doReturn(hri).when(r).getRegionInfo(); + + // put a delayed task with 30s delay + msf.requestDelayedFlush(r, 30000); + assertEquals(1, msf.getFlushQueueSize()); + assertTrue(msf.regionsInQueue.get(r).isDelay()); + + // put a non-delayed task, then the delayed one should be replaced + assertTrue(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY)); + assertEquals(1, msf.getFlushQueueSize()); + assertFalse(msf.regionsInQueue.get(r).isDelay()); + } + + @Test + public void testNotReplaceDelayedFlushEntryWhichExpired() { + RegionInfo hri = + RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setRegionId(1) + .setReplicaId(0).build(); + HRegion r = mock(HRegion.class); + doReturn(hri).when(r).getRegionInfo(); + + // put a delayed task with 100ms delay + msf.requestDelayedFlush(r, 100); + assertEquals(1, msf.getFlushQueueSize()); + assertTrue(msf.regionsInQueue.get(r).isDelay()); + + Threads.sleep(200); + + // put a non-delayed task, and the delayed one is expired, so it should not be replaced + assertFalse(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY)); + assertEquals(1, msf.getFlushQueueSize()); + assertTrue(msf.regionsInQueue.get(r).isDelay()); + } +}