HBASE-24991 Replace MovedRegionsCleaner with guava cache (#2357)
Signed-off-by: stack <stack@apache.org> Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
6371914adb
commit
58618e35ad
|
@ -277,6 +277,13 @@ public class HRegionServer extends Thread implements
|
||||||
private final Cache<Long, Long> executedRegionProcedures =
|
private final Cache<Long, Long> executedRegionProcedures =
|
||||||
CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
|
CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to cache the moved-out regions
|
||||||
|
*/
|
||||||
|
private final Cache<String, MovedRegionInfo> movedRegionInfoCache =
|
||||||
|
CacheBuilder.newBuilder().expireAfterWrite(movedRegionCacheExpiredTime(),
|
||||||
|
TimeUnit.MILLISECONDS).build();
|
||||||
|
|
||||||
private MemStoreFlusher cacheFlusher;
|
private MemStoreFlusher cacheFlusher;
|
||||||
|
|
||||||
private HeapMemoryManager hMemManager;
|
private HeapMemoryManager hMemManager;
|
||||||
|
@ -476,11 +483,6 @@ public class HRegionServer extends Thread implements
|
||||||
*/
|
*/
|
||||||
protected String clusterId;
|
protected String clusterId;
|
||||||
|
|
||||||
/**
|
|
||||||
* Chore to clean periodically the moved region list
|
|
||||||
*/
|
|
||||||
private MovedRegionsCleaner movedRegionsCleaner;
|
|
||||||
|
|
||||||
// chore for refreshing store files for secondary regions
|
// chore for refreshing store files for secondary regions
|
||||||
private StorefileRefresherChore storefileRefresher;
|
private StorefileRefresherChore storefileRefresher;
|
||||||
|
|
||||||
|
@ -1079,10 +1081,6 @@ public class HRegionServer extends Thread implements
|
||||||
mobFileCache.shutdown();
|
mobFileCache.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (movedRegionsCleaner != null) {
|
|
||||||
movedRegionsCleaner.stop("Region Server stopping");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send interrupts to wake up threads if sleeping so they notice shutdown.
|
// Send interrupts to wake up threads if sleeping so they notice shutdown.
|
||||||
// TODO: Should we check they are alive? If OOME could have exited already
|
// TODO: Should we check they are alive? If OOME could have exited already
|
||||||
if (this.hMemManager != null) this.hMemManager.stop();
|
if (this.hMemManager != null) this.hMemManager.stop();
|
||||||
|
@ -2051,9 +2049,6 @@ public class HRegionServer extends Thread implements
|
||||||
if (this.storefileRefresher != null) {
|
if (this.storefileRefresher != null) {
|
||||||
choreService.scheduleChore(storefileRefresher);
|
choreService.scheduleChore(storefileRefresher);
|
||||||
}
|
}
|
||||||
if (this.movedRegionsCleaner != null) {
|
|
||||||
choreService.scheduleChore(movedRegionsCleaner);
|
|
||||||
}
|
|
||||||
if (this.fsUtilizationChore != null) {
|
if (this.fsUtilizationChore != null) {
|
||||||
choreService.scheduleChore(fsUtilizationChore);
|
choreService.scheduleChore(fsUtilizationChore);
|
||||||
}
|
}
|
||||||
|
@ -2111,9 +2106,6 @@ public class HRegionServer extends Thread implements
|
||||||
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
|
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the thread to clean the moved regions list
|
|
||||||
movedRegionsCleaner = MovedRegionsCleaner.create(this);
|
|
||||||
|
|
||||||
if (this.nonceManager != null) {
|
if (this.nonceManager != null) {
|
||||||
// Create the scheduled chore that cleans up nonces.
|
// Create the scheduled chore that cleans up nonces.
|
||||||
nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
|
nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
|
||||||
|
@ -2614,7 +2606,6 @@ public class HRegionServer extends Thread implements
|
||||||
choreService.cancelChore(healthCheckChore);
|
choreService.cancelChore(healthCheckChore);
|
||||||
choreService.cancelChore(executorStatusChore);
|
choreService.cancelChore(executorStatusChore);
|
||||||
choreService.cancelChore(storefileRefresher);
|
choreService.cancelChore(storefileRefresher);
|
||||||
choreService.cancelChore(movedRegionsCleaner);
|
|
||||||
choreService.cancelChore(fsUtilizationChore);
|
choreService.cancelChore(fsUtilizationChore);
|
||||||
choreService.cancelChore(slowLogTableOpsChore);
|
choreService.cancelChore(slowLogTableOpsChore);
|
||||||
// clean up the remaining scheduled chores (in case we missed out any)
|
// clean up the remaining scheduled chores (in case we missed out any)
|
||||||
|
@ -3485,12 +3476,10 @@ public class HRegionServer extends Thread implements
|
||||||
private static class MovedRegionInfo {
|
private static class MovedRegionInfo {
|
||||||
private final ServerName serverName;
|
private final ServerName serverName;
|
||||||
private final long seqNum;
|
private final long seqNum;
|
||||||
private final long moveTime;
|
|
||||||
|
|
||||||
MovedRegionInfo(ServerName serverName, long closeSeqNum) {
|
MovedRegionInfo(ServerName serverName, long closeSeqNum) {
|
||||||
this.serverName = serverName;
|
this.serverName = serverName;
|
||||||
this.seqNum = closeSeqNum;
|
this.seqNum = closeSeqNum;
|
||||||
this.moveTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ServerName getServerName() {
|
public ServerName getServerName() {
|
||||||
|
@ -3500,18 +3489,8 @@ public class HRegionServer extends Thread implements
|
||||||
public long getSeqNum() {
|
public long getSeqNum() {
|
||||||
return seqNum;
|
return seqNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
long getMoveTime() {
|
|
||||||
return moveTime;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This map will contains all the regions that we closed for a move.
|
|
||||||
* We add the time it was moved as we don't want to keep too old information
|
|
||||||
*/
|
|
||||||
private Map<String, MovedRegionInfo> movedRegions = new ConcurrentHashMap<>(3000);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We need a timeout. If not there is a risk of giving a wrong information: this would double
|
* We need a timeout. If not there is a risk of giving a wrong information: this would double
|
||||||
* the number of network calls instead of reducing them.
|
* the number of network calls instead of reducing them.
|
||||||
|
@ -3525,86 +3504,23 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
|
LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" +
|
||||||
closeSeqNum);
|
closeSeqNum);
|
||||||
movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
|
movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
|
||||||
}
|
}
|
||||||
|
|
||||||
void removeFromMovedRegions(String encodedName) {
|
void removeFromMovedRegions(String encodedName) {
|
||||||
movedRegions.remove(encodedName);
|
movedRegionInfoCache.invalidate(encodedName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
|
@VisibleForTesting
|
||||||
MovedRegionInfo dest = movedRegions.get(encodedRegionName);
|
public MovedRegionInfo getMovedRegion(String encodedRegionName) {
|
||||||
|
return movedRegionInfoCache.getIfPresent(encodedRegionName);
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
if (dest != null) {
|
|
||||||
if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
|
|
||||||
return dest;
|
|
||||||
} else {
|
|
||||||
movedRegions.remove(encodedRegionName);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@VisibleForTesting
|
||||||
* Remove the expired entries from the moved regions list.
|
public int movedRegionCacheExpiredTime() {
|
||||||
*/
|
|
||||||
protected void cleanMovedRegions() {
|
|
||||||
final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
|
|
||||||
|
|
||||||
movedRegions.entrySet().removeIf(e -> e.getValue().getMoveTime() < cutOff);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Use this to allow tests to override and schedule more frequently.
|
|
||||||
*/
|
|
||||||
|
|
||||||
protected int movedRegionCleanerPeriod() {
|
|
||||||
return TIMEOUT_REGION_MOVED;
|
return TIMEOUT_REGION_MOVED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a Chore thread to clean the moved region cache.
|
|
||||||
*/
|
|
||||||
protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
|
|
||||||
private HRegionServer regionServer;
|
|
||||||
Stoppable stoppable;
|
|
||||||
|
|
||||||
private MovedRegionsCleaner(
|
|
||||||
HRegionServer regionServer, Stoppable stoppable){
|
|
||||||
super("MovedRegionsCleaner for region " + regionServer, stoppable,
|
|
||||||
regionServer.movedRegionCleanerPeriod());
|
|
||||||
this.regionServer = regionServer;
|
|
||||||
this.stoppable = stoppable;
|
|
||||||
}
|
|
||||||
|
|
||||||
static MovedRegionsCleaner create(HRegionServer rs){
|
|
||||||
Stoppable stoppable = new Stoppable() {
|
|
||||||
private volatile boolean isStopped = false;
|
|
||||||
@Override public void stop(String why) { isStopped = true;}
|
|
||||||
@Override public boolean isStopped() {return isStopped;}
|
|
||||||
};
|
|
||||||
|
|
||||||
return new MovedRegionsCleaner(rs, stoppable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void chore() {
|
|
||||||
regionServer.cleanMovedRegions();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop(String why) {
|
|
||||||
stoppable.stop(why);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isStopped() {
|
|
||||||
return stoppable.isStopped();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getMyEphemeralNodePath() {
|
private String getMyEphemeralNodePath() {
|
||||||
return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
|
return ZNodePaths.joinZNode(this.zooKeeper.getZNodePaths().rsZNode, getServerName().toString());
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether moved region cache is correct
|
||||||
|
*/
|
||||||
|
@Category({ MiscTests.class, MediumTests.class })
|
||||||
|
public class TestMovedRegionCache {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestMovedRegionCache.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
private HBaseTestingUtility UTIL;
|
||||||
|
private MiniZooKeeperCluster zkCluster;
|
||||||
|
private HRegionServer source;
|
||||||
|
private HRegionServer dest;
|
||||||
|
private RegionInfo movedRegionInfo;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
UTIL = new HBaseTestingUtility();
|
||||||
|
zkCluster = UTIL.startMiniZKCluster();
|
||||||
|
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
|
||||||
|
MiniHBaseCluster cluster = UTIL.startMiniHBaseCluster(option);
|
||||||
|
source = cluster.getRegionServer(0);
|
||||||
|
dest = cluster.getRegionServer(1);
|
||||||
|
assertEquals(2, cluster.getRegionServerThreads().size());
|
||||||
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
UTIL.createTable(tableName, Bytes.toBytes("cf"));
|
||||||
|
UTIL.waitTableAvailable(tableName, 30_000);
|
||||||
|
movedRegionInfo = Iterables.getOnlyElement(cluster.getRegions(tableName)).getRegionInfo();
|
||||||
|
UTIL.getAdmin().move(movedRegionInfo.getEncodedNameAsBytes(), source.getServerName());
|
||||||
|
UTIL.waitFor(2000, new Waiter.Predicate<IOException>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws IOException {
|
||||||
|
return source.getOnlineRegion(movedRegionInfo.getRegionName()) != null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
if (zkCluster != null) {
|
||||||
|
zkCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMovedRegionsCache() throws IOException, InterruptedException {
|
||||||
|
UTIL.getAdmin().move(movedRegionInfo.getEncodedNameAsBytes(), dest.getServerName());
|
||||||
|
UTIL.waitFor(2000, new Waiter.Predicate<IOException>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws IOException {
|
||||||
|
return dest.getOnlineRegion(movedRegionInfo.getRegionName()) != null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertNotNull("Moved region NOT in the cache!", source.getMovedRegion(
|
||||||
|
movedRegionInfo.getEncodedName()));
|
||||||
|
Thread.sleep(source.movedRegionCacheExpiredTime());
|
||||||
|
assertNull("Expired moved region exist in the cache!", source.getMovedRegion(
|
||||||
|
movedRegionInfo.getEncodedName()));
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,95 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test whether background cleanup of MovedRegion entries is happening
|
|
||||||
*/
|
|
||||||
@Category({ MiscTests.class, MediumTests.class })
|
|
||||||
public class TestMovedRegionsCleaner {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestMovedRegionsCleaner.class);
|
|
||||||
|
|
||||||
private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
public static int numCalls = 0;
|
|
||||||
|
|
||||||
private static class TestMockRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
|
|
||||||
|
|
||||||
public TestMockRegionServer(Configuration conf) throws IOException, InterruptedException {
|
|
||||||
super(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int movedRegionCleanerPeriod() {
|
|
||||||
return 500;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override protected void cleanMovedRegions() {
|
|
||||||
// count the number of calls that are being made to this
|
|
||||||
//
|
|
||||||
numCalls++;
|
|
||||||
super.cleanMovedRegions();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@After public void after() throws Exception {
|
|
||||||
UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before public void before() throws Exception {
|
|
||||||
UTIL.getConfiguration()
|
|
||||||
.setStrings(HConstants.REGION_SERVER_IMPL, TestMockRegionServer.class.getName());
|
|
||||||
UTIL.startMiniCluster(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start the cluster, wait for some time and verify that the background
|
|
||||||
* MovedRegion cleaner indeed gets called
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
|
||||||
@Test public void testMovedRegionsCleaner() throws IOException, InterruptedException {
|
|
||||||
// We need to sleep long enough to trigger at least one round of background calls
|
|
||||||
// to MovedRegionCleaner happen. Currently the period is set to 500ms.
|
|
||||||
// Setting the sleep here for 2s just to be safe
|
|
||||||
//
|
|
||||||
UTIL.waitFor(2000, new Waiter.Predicate<IOException>() {
|
|
||||||
@Override
|
|
||||||
public boolean evaluate() throws IOException {
|
|
||||||
|
|
||||||
// verify that there was at least one call to the cleanMovedRegions function
|
|
||||||
//
|
|
||||||
return numCalls > 0;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -78,13 +78,6 @@ public class TestRSChoresScheduled {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultScheduledChores() throws Exception {
|
public void testDefaultScheduledChores() throws Exception {
|
||||||
// test if movedRegionsCleaner chore is scheduled by default in HRegionServer init
|
|
||||||
TestChoreField<HRegionServer.MovedRegionsCleaner> movedRegionsCleanerTestChoreField =
|
|
||||||
new TestChoreField<>();
|
|
||||||
HRegionServer.MovedRegionsCleaner movedRegionsCleaner = movedRegionsCleanerTestChoreField
|
|
||||||
.getChoreObj("movedRegionsCleaner");
|
|
||||||
movedRegionsCleanerTestChoreField.testIfChoreScheduled(movedRegionsCleaner);
|
|
||||||
|
|
||||||
// test if compactedHFilesDischarger chore is scheduled by default in HRegionServer init
|
// test if compactedHFilesDischarger chore is scheduled by default in HRegionServer init
|
||||||
TestChoreField<CompactedHFilesDischarger> compactedHFilesDischargerTestChoreField =
|
TestChoreField<CompactedHFilesDischarger> compactedHFilesDischargerTestChoreField =
|
||||||
new TestChoreField<>();
|
new TestChoreField<>();
|
||||||
|
|
Loading…
Reference in New Issue