HBASE-25643 The delayed FlushRegionEntry should be removed when we ne… (#3049)

Signed-off-by: AnoopSamJohn <anoopsamjohn@apache.org>
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
bsglz 2021-03-19 03:13:06 +08:00 committed by stack
parent 9e701bb1d3
commit 19d094b96f
3 changed files with 145 additions and 24 deletions

View File

@ -8380,6 +8380,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
flushesQueued.increment(); flushesQueued.increment();
} }
protected void decrementFlushesQueuedCount() {
flushesQueued.decrement();
}
/** /**
* If a handler thread is eligible for interrupt, make it ineligible. Should be paired * If a handler thread is eligible for interrupt, make it ineligible. Should be paired
* with {{@link #enableInterrupts()}. * with {{@link #enableInterrupts()}.

View File

@ -72,7 +72,7 @@ class MemStoreFlusher implements FlushRequester {
// These two data members go together. Any entry in the one must have // These two data members go together. Any entry in the one must have
// a corresponding entry in the other. // a corresponding entry in the other.
private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>(); private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>(); protected final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
private AtomicBoolean wakeupPending = new AtomicBoolean(); private AtomicBoolean wakeupPending = new AtomicBoolean();
private final long threadWakeFrequency; private final long threadWakeFrequency;
@ -126,20 +126,22 @@ class MemStoreFlusher implements FlushRequester {
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000); 90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
if (handlerCount < 1) { if (server != null) {
LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, corrected to 1", if (handlerCount < 1) {
handlerCount); LOG.warn("hbase.hstore.flusher.count was configed to {} which is less than 1, "
handlerCount = 1; + "corrected to 1", handlerCount);
handlerCount = 1;
}
LOG.info("globalMemStoreLimit="
+ TraditionalBinaryPrefix
.long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
+ ", globalMemStoreLimitLowMark="
+ TraditionalBinaryPrefix.long2String(
this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
+ ", Offheap="
+ (this.server.getRegionServerAccounting().isOffheap()));
} }
this.flushHandlers = new FlushHandler[handlerCount]; this.flushHandlers = new FlushHandler[handlerCount];
LOG.info("globalMemStoreLimit="
+ TraditionalBinaryPrefix
.long2String(this.server.getRegionServerAccounting().getGlobalMemStoreLimit(), "", 1)
+ ", globalMemStoreLimitLowMark="
+ TraditionalBinaryPrefix.long2String(
this.server.getRegionServerAccounting().getGlobalMemStoreLimitLowMark(), "", 1)
+ ", Offheap="
+ (this.server.getRegionServerAccounting().isOffheap()));
} }
public LongAdder getUpdatesBlockedMsHighWater() { public LongAdder getUpdatesBlockedMsHighWater() {
@ -462,18 +464,28 @@ class MemStoreFlusher implements FlushRequester {
public boolean requestFlush(HRegion r, List<byte[]> families, public boolean requestFlush(HRegion r, List<byte[]> families,
FlushLifeCycleTracker tracker) { FlushLifeCycleTracker tracker) {
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) { FlushRegionEntry existFqe = regionsInQueue.get(r);
// This entry has no delay so it will be added at the top of the flush if (existFqe != null) {
// queue. It'll come out near immediately. // if a delayed one exists and not reach the time to execute, just remove it
FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker); if (existFqe.isDelay() && existFqe.whenToExpire > EnvironmentEdgeManager.currentTime()) {
this.regionsInQueue.put(r, fqe); LOG.info("Remove the existing delayed flush entry for {}, "
this.flushQueue.add(fqe); + "because we need to flush it immediately", r);
r.incrementFlushesQueuedCount(); this.regionsInQueue.remove(r);
return true; this.flushQueue.remove(existFqe);
} else { r.decrementFlushesQueuedCount();
tracker.notExecuted("Flush already requested on " + r); } else {
return false; tracker.notExecuted("Flush already requested on " + r);
return false;
}
} }
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, families, tracker);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
r.incrementFlushesQueuedCount();
return true;
} }
} }
@ -868,6 +880,13 @@ class MemStoreFlusher implements FlushRequester {
return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
} }
/**
* @return True if the entry is a delay flush task
*/
protected boolean isDelay() {
return this.whenToExpire > this.createTime;
}
/** /**
* @return Count of times {@link #requeue(long)} was called; i.e this is * @return Count of times {@link #requeue(long)} was called; i.e this is
* number of times we've been requeued. * number of times we've been requeued.

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ RegionServerTests.class, SmallTests.class })
public class TestMemStoreFlusher {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMemStoreFlusher.class);
@Rule
public TestName name = new TestName();
public MemStoreFlusher msf;
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.hstore.flusher.count", "0");
msf = new MemStoreFlusher(conf, null);
}
@Test
public void testReplaceDelayedFlushEntry() {
RegionInfo hri =
RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setRegionId(1)
.setReplicaId(0).build();
HRegion r = mock(HRegion.class);
doReturn(hri).when(r).getRegionInfo();
// put a delayed task with 30s delay
msf.requestDelayedFlush(r, 30000);
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());
// put a non-delayed task, then the delayed one should be replaced
assertTrue(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY));
assertEquals(1, msf.getFlushQueueSize());
assertFalse(msf.regionsInQueue.get(r).isDelay());
}
@Test
public void testNotReplaceDelayedFlushEntryWhichExpired() {
RegionInfo hri =
RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setRegionId(1)
.setReplicaId(0).build();
HRegion r = mock(HRegion.class);
doReturn(hri).when(r).getRegionInfo();
// put a delayed task with 100ms delay
msf.requestDelayedFlush(r, 100);
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());
Threads.sleep(200);
// put a non-delayed task, and the delayed one is expired, so it should not be replaced
assertFalse(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY));
assertEquals(1, msf.getFlushQueueSize());
assertTrue(msf.regionsInQueue.get(r).isDelay());
}
}