HBASE-27752: Update the list of prefetched files upon region movement (#5194)
Co-authored-by: Shanmukha Kota <skota@cloudera.com>
This commit is contained in:
parent
78610decaa
commit
ece8d014af
|
@ -123,7 +123,7 @@ public final class PrefetchExecutor {
|
||||||
public static void complete(Path path) {
|
public static void complete(Path path) {
|
||||||
prefetchFutures.remove(path);
|
prefetchFutures.remove(path);
|
||||||
prefetchCompleted.put(path.getName(), true);
|
prefetchCompleted.put(path.getName(), true);
|
||||||
LOG.debug("Prefetch completed for {}", path);
|
LOG.debug("Prefetch completed for {}", path.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void cancel(Path path) {
|
public static void cancel(Path path) {
|
||||||
|
@ -134,7 +134,8 @@ public final class PrefetchExecutor {
|
||||||
prefetchFutures.remove(path);
|
prefetchFutures.remove(path);
|
||||||
LOG.debug("Prefetch cancelled for {}", path);
|
LOG.debug("Prefetch cancelled for {}", path);
|
||||||
}
|
}
|
||||||
prefetchCompleted.remove(path.getName());
|
LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName());
|
||||||
|
removePrefetchedFileWhileEvict(path.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isCompleted(Path path) {
|
public static boolean isCompleted(Path path) {
|
||||||
|
|
|
@ -427,6 +427,10 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isCachePersistenceEnabled() {
|
||||||
|
return (prefetchedFileListPath != null) && (persistencePath != null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cache the block with the specified name and buffer.
|
* Cache the block with the specified name and buffer.
|
||||||
* @param cacheKey block's cache key
|
* @param cacheKey block's cache key
|
||||||
|
|
|
@ -135,7 +135,9 @@ import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.io.TimeRange;
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
|
@ -1597,13 +1599,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close down this HRegion. Flush the cache unless abort parameter is true, Shut down each HStore,
|
* Close this HRegion.
|
||||||
* don't service any more calls. This method could take some time to execute, so don't call it
|
|
||||||
* from a time-sensitive thread.
|
|
||||||
* @param abort true if server is aborting (only during testing)
|
* @param abort true if server is aborting (only during testing)
|
||||||
* @param ignoreStatus true if ignore the status (wont be showed on task list)
|
* @param ignoreStatus true if ignore the status (won't be showed on task list)
|
||||||
* @return Vector of all the storage files that the HRegion's component HStores make use of. It's
|
* @return Vector of all the storage files that the HRegion's component HStores make use of. It's
|
||||||
* a list of StoreFile objects. Can be null if we are not to close at this time or we are
|
* a list of StoreFile objects. Can be null if we are not to close at this time, or we are
|
||||||
* already closed.
|
* already closed.
|
||||||
* @throws IOException e
|
* @throws IOException e
|
||||||
* @throws DroppedSnapshotException Thrown when replay of wal is required because a Snapshot was
|
* @throws DroppedSnapshotException Thrown when replay of wal is required because a Snapshot was
|
||||||
|
@ -1612,6 +1612,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
*/
|
*/
|
||||||
public Map<byte[], List<HStoreFile>> close(boolean abort, boolean ignoreStatus)
|
public Map<byte[], List<HStoreFile>> close(boolean abort, boolean ignoreStatus)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
return close(abort, ignoreStatus, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close down this HRegion. Flush the cache unless abort parameter is true, Shut down each HStore,
|
||||||
|
* don't service any more calls. This method could take some time to execute, so don't call it
|
||||||
|
* from a time-sensitive thread.
|
||||||
|
* @param abort true if server is aborting (only during testing)
|
||||||
|
* @param ignoreStatus true if ignore the status (wont be showed on task list)
|
||||||
|
* @param isGracefulStop true if region is being closed during graceful stop and the blocks in the
|
||||||
|
* BucketCache should not be evicted.
|
||||||
|
* @return Vector of all the storage files that the HRegion's component HStores make use of. It's
|
||||||
|
* a list of StoreFile objects. Can be null if we are not to close at this time or we are
|
||||||
|
* already closed.
|
||||||
|
* @throws IOException e
|
||||||
|
* @throws DroppedSnapshotException Thrown when replay of wal is required because a Snapshot was
|
||||||
|
* not properly persisted. The region is put in closing mode, and
|
||||||
|
* the caller MUST abort after this.
|
||||||
|
*/
|
||||||
|
public Map<byte[], List<HStoreFile>> close(boolean abort, boolean ignoreStatus,
|
||||||
|
boolean isGracefulStop) throws IOException {
|
||||||
// Only allow one thread to close at a time. Serialize them so dual
|
// Only allow one thread to close at a time. Serialize them so dual
|
||||||
// threads attempting to close will run up against each other.
|
// threads attempting to close will run up against each other.
|
||||||
MonitoredTask status = TaskMonitor.get().createStatus(
|
MonitoredTask status = TaskMonitor.get().createStatus(
|
||||||
|
@ -1620,6 +1641,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
status.setStatus("Waiting for close lock");
|
status.setStatus("Waiting for close lock");
|
||||||
try {
|
try {
|
||||||
synchronized (closeLock) {
|
synchronized (closeLock) {
|
||||||
|
if (isGracefulStop && rsServices != null) {
|
||||||
|
rsServices.getBlockCache().ifPresent(blockCache -> {
|
||||||
|
if (blockCache instanceof CombinedBlockCache) {
|
||||||
|
BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache();
|
||||||
|
if (l2 instanceof BucketCache) {
|
||||||
|
if (((BucketCache) l2).isCachePersistenceEnabled()) {
|
||||||
|
LOG.info(
|
||||||
|
"Closing region {} during a graceful stop, and cache persistence is on, "
|
||||||
|
+ "so setting evict on close to false. ",
|
||||||
|
this.getRegionInfo().getRegionNameAsString());
|
||||||
|
this.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
return doClose(abort, status);
|
return doClose(abort, status);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -102,7 +102,7 @@ public class CloseRegionHandler extends EventHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the region
|
// Close the region
|
||||||
if (region.close(abort) == null) {
|
if (region.close(abort, false, true) == null) {
|
||||||
// This region has already been closed. Should not happen (A unit test makes this
|
// This region has already been closed. Should not happen (A unit test makes this
|
||||||
// happen as a side effect, TestRegionObserverInterface.testPreWALAppendNotCalledOnMetaEdit)
|
// happen as a side effect, TestRegionObserverInterface.testPreWALAppendNotCalledOnMetaEdit)
|
||||||
LOG.warn("Can't close {}; already closed", name);
|
LOG.warn("Can't close {}; already closed", name);
|
||||||
|
|
|
@ -0,0 +1,179 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.StartTestingClusterOption;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@Category({ IOTests.class, MediumTests.class })
|
||||||
|
public class TestBlockEvictionOnRegionMovement {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestBlockEvictionOnRegionMovement.class);
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestBlockEvictionOnRegionMovement.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
Path testDir;
|
||||||
|
MiniZooKeeperCluster zkCluster;
|
||||||
|
SingleProcessHBaseCluster cluster;
|
||||||
|
StartTestingClusterOption option =
|
||||||
|
StartTestingClusterOption.builder().numRegionServers(2).build();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
testDir = TEST_UTIL.getDataTestDir();
|
||||||
|
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
|
||||||
|
|
||||||
|
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
|
||||||
|
conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
|
||||||
|
conf.setInt("hbase.bucketcache.size", 400);
|
||||||
|
conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence");
|
||||||
|
conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence");
|
||||||
|
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100);
|
||||||
|
conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true);
|
||||||
|
zkCluster = TEST_UTIL.startMiniZKCluster();
|
||||||
|
cluster = TEST_UTIL.startMiniHBaseCluster(option);
|
||||||
|
cluster.setConf(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockEvictionOnRegionMove() throws Exception {
|
||||||
|
// Write to table and flush
|
||||||
|
TableName tableRegionMove = writeDataToTable();
|
||||||
|
|
||||||
|
HRegionServer regionServingRS =
|
||||||
|
cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1
|
||||||
|
? cluster.getRegionServer(1)
|
||||||
|
: cluster.getRegionServer(0);
|
||||||
|
assertTrue(regionServingRS.getBlockCache().isPresent());
|
||||||
|
long oldUsedCacheSize =
|
||||||
|
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
|
||||||
|
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
|
||||||
|
|
||||||
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
|
RegionInfo regionToMove = regionServingRS.getRegions(tableRegionMove).get(0).getRegionInfo();
|
||||||
|
admin.move(regionToMove.getEncodedNameAsBytes(),
|
||||||
|
TEST_UTIL.getOtherRegionServer(regionServingRS).getServerName());
|
||||||
|
assertEquals(0, regionServingRS.getRegions(tableRegionMove).size());
|
||||||
|
|
||||||
|
long newUsedCacheSize =
|
||||||
|
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
|
||||||
|
assertTrue(oldUsedCacheSize > newUsedCacheSize);
|
||||||
|
assertEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockEvictionOnGracefulStop() throws Exception {
|
||||||
|
// Write to table and flush
|
||||||
|
TableName tableRegionClose = writeDataToTable();
|
||||||
|
|
||||||
|
HRegionServer regionServingRS =
|
||||||
|
cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1
|
||||||
|
? cluster.getRegionServer(1)
|
||||||
|
: cluster.getRegionServer(0);
|
||||||
|
|
||||||
|
assertTrue(regionServingRS.getBlockCache().isPresent());
|
||||||
|
long oldUsedCacheSize =
|
||||||
|
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
|
||||||
|
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
|
||||||
|
|
||||||
|
cluster.stopRegionServer(regionServingRS.getServerName());
|
||||||
|
Thread.sleep(500);
|
||||||
|
cluster.startRegionServer();
|
||||||
|
Thread.sleep(500);
|
||||||
|
|
||||||
|
long newUsedCacheSize =
|
||||||
|
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
|
||||||
|
assertEquals(oldUsedCacheSize, newUsedCacheSize);
|
||||||
|
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public TableName writeDataToTable() throws IOException, InterruptedException {
|
||||||
|
TableName tableName = TableName.valueOf("table1");
|
||||||
|
byte[] row0 = Bytes.toBytes("row1");
|
||||||
|
byte[] row1 = Bytes.toBytes("row2");
|
||||||
|
byte[] family = Bytes.toBytes("family");
|
||||||
|
byte[] qf1 = Bytes.toBytes("qf1");
|
||||||
|
byte[] qf2 = Bytes.toBytes("qf2");
|
||||||
|
byte[] value1 = Bytes.toBytes("value1");
|
||||||
|
byte[] value2 = Bytes.toBytes("value2");
|
||||||
|
|
||||||
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
|
||||||
|
Table table = TEST_UTIL.createTable(td, null);
|
||||||
|
try {
|
||||||
|
// put data
|
||||||
|
Put put0 = new Put(row0);
|
||||||
|
put0.addColumn(family, qf1, 1, value1);
|
||||||
|
table.put(put0);
|
||||||
|
Put put1 = new Put(row1);
|
||||||
|
put1.addColumn(family, qf2, 1, value2);
|
||||||
|
table.put(put1);
|
||||||
|
TEST_UTIL.flush(tableName);
|
||||||
|
} finally {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
assertEquals(1, cluster.getRegions(tableName).size());
|
||||||
|
return tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
|
||||||
|
if (zkCluster != null) {
|
||||||
|
zkCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue