diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a07bb15e254..4faecde472d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -8417,6 +8417,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi flushesQueued.increment(); } + protected void decrementFlushesQueuedCount() { + flushesQueued.decrement(); + } + /** * If a handler thread is eligible for interrupt, make it ineligible. Should be paired * with {{@link #enableInterrupts()}. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 0a5ec917c74..61d54955295 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -73,7 +73,7 @@ class MemStoreFlusher implements FlushRequester { // These two data members go together. Any entry in the one must have // a corresponding entry in the other. private final BlockingQueue flushQueue = new DelayQueue<>(); - private final Map regionsInQueue = new HashMap<>(); + protected final Map regionsInQueue = new HashMap<>(); private AtomicBoolean wakeupPending = new AtomicBoolean(); private final long threadWakeFrequency; @@ -127,20 +127,22 @@ class MemStoreFlusher implements FlushRequester { this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); - if (handlerCount < 1) { - LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, corrected to 1", - handlerCount); - handlerCount = 1; + if (server != null) { + if (handlerCount < 1) { + LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, " + + "corrected to 1", handlerCount); + handlerCount = 1; + } + LOG.info("globalMemStoreLimit=" + + TraditionalBinaryPrefix + .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) + + ", globalMemStoreLimitLowMark=" + + TraditionalBinaryPrefix.long2String( + this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) + + ", Offheap=" + + (this.server.getRegionServerAccounting().isOffheap())); } this.flushHandlers = new FlushHandler[handlerCount]; - LOG.info("globalMemStoreLimit=" - + TraditionalBinaryPrefix - .long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1) - + ", globalMemStoreLimitLowMark=" - + TraditionalBinaryPrefix.long2String( - this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1) - + ", Offheap=" - + (this.server.getRegionServerAccounting().isOffheap())); } public LongAdder getUpdatesBlockedMsHighWater() { @@ -467,18 +469,28 @@ class MemStoreFlusher implements FlushRequester { public boolean requestFlush(HRegion r, List families, FlushLifeCycleTracker tracker) { synchronized (regionsInQueue) { - if (!regionsInQueue.containsKey(r)) { - // This entry has no delay so it will be added at the top of the flush - // queue. It'll come out near immediately. - FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); - this.regionsInQueue.put(r, fqe); - this.flushQueue.add(fqe); - r.incrementFlushesQueuedCount(); - return true; - } else { - tracker.notExecuted("Flush already requested on " + r); - return false; + FlushRegionEntry existFqe = regionsInQueue.get(r); + if (existFqe != null) { + // if a delayed one exists and not reach the time to execute, just remove it + if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) { + LOG.info("Remove the existing delayed flush entry for {}, " + + "because we need to flush it immediately", r); + this.regionsInQueue.remove(r); + this.flushQueue.remove(existFqe); + r.decrementFlushesQueuedCount(); + } else { + tracker.notExecuted("Flush already requested on " + r); + return false; + } } + + // This entry has no delay so it will be added at the top of the flush + // queue. It'll come out near immediately. + FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); + this.regionsInQueue.put(r, fqe); + this.flushQueue.add(fqe); + r.incrementFlushesQueuedCount(); + return true; } } @@ -873,6 +885,13 @@ class MemStoreFlusher implements FlushRequester { return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; } + /** + * @return True if the entry is a delay flush task + */ + protected boolean isDelay() { + return this.whenToExpire > this.createTime; + } + /** * @return Count of times {@link #requeue(long)} was called; i.e this is * number of times we've been requeued. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java new file mode 100644 index 00000000000..bc3df0ab805 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreFlusher.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestMemStoreFlusher { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMemStoreFlusher.class); + + @Rule + public TestName name = new TestName(); + + public MemStoreFlusher msf; + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + conf.set("hbase.hstore.flusher.count", "0"); + msf = new MemStoreFlusher(conf, null); + } + + @Test + public void testReplaceDelayedFlushEntry() { + RegionInfo hri = + RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setRegionId(1) + .setReplicaId(0).build(); + HRegion r = mock(HRegion.class); + doReturn(hri).when(r).getRegionInfo(); + + // put a delayed task with 30s delay + msf.requestDelayedFlush(r, 30000); + assertEquals(1, msf.getFlushQueueSize()); + assertTrue(msf.regionsInQueue.get(r).isDelay()); + + // put a non-delayed task, then the delayed one should be replaced + assertTrue(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY)); + assertEquals(1, msf.getFlushQueueSize()); + assertFalse(msf.regionsInQueue.get(r).isDelay()); + } + + @Test + public void testNotReplaceDelayedFlushEntryWhichExpired() { + RegionInfo hri = + RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setRegionId(1) + .setReplicaId(0).build(); + HRegion r = mock(HRegion.class); + doReturn(hri).when(r).getRegionInfo(); + + // put a delayed task with 100ms delay + msf.requestDelayedFlush(r, 100); + assertEquals(1, msf.getFlushQueueSize()); + assertTrue(msf.regionsInQueue.get(r).isDelay()); + + Threads.sleep(200); + + // put a non-delayed task, and the delayed one is expired, so it should not be replaced + assertFalse(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY)); + assertEquals(1, msf.getFlushQueueSize()); + assertTrue(msf.regionsInQueue.get(r).isDelay()); + } +}