HBASE-22163 Should not archive the compacted store files when region warmup (#122)

* HBASE-22163 Should not archive the compacted store files when region warmup
This commit is contained in:
Guanghao 2019-04-08 22:11:07 +08:00 committed by GitHub
parent ad81d25a8b
commit 89ce5d17e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 228 additions and 66 deletions

View File

@ -109,9 +109,9 @@ public class TestRefreshHFilesEndpoint extends TestRefreshHFilesBase {
}
public static class HStoreWithFaultyRefreshHFilesAPI extends HStore {
public HStoreWithFaultyRefreshHFilesAPI(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam) throws IOException {
super(region, family, confParam);
public HStoreWithFaultyRefreshHFilesAPI(final HRegion region,
final ColumnFamilyDescriptor family, final Configuration confParam) throws IOException {
super(region, family, confParam, false);
}
@Override

View File

@ -191,7 +191,7 @@ public class CompactionTool extends Configured implements Tool {
}
};
HRegion region = new HRegion(regionFs, null, conf, htd, null);
return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf);
return new HStore(region, htd.getColumnFamily(Bytes.toBytes(familyName)), conf, false);
}
}

View File

@ -134,6 +134,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
@ -2025,13 +2026,15 @@ public class HMaster extends HRegionServer implements MasterServices {
if (this.cpHost != null) {
this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
}
TransitRegionStateProcedure proc =
this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
// Warmup the region on the destination before initiating the move. this call
// is synchronous and takes some time. doing it before the source region gets
// closed
serverManager.sendRegionWarmup(rp.getDestination(), hri);
LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
Future<byte []> future = this.assignmentManager.moveAsync(rp);
Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
try {
// Is this going to work? Will we throw exception on error?
// TODO: CompletableFuture rather than this stunted Future.

View File

@ -592,7 +592,7 @@ public class AssignmentManager {
return proc.getProcId();
}
private TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
ServerName targetServer) throws HBaseIOException {
RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
if (regionNode == null) {

View File

@ -104,8 +104,8 @@ public class HMobStore extends HStore {
private final byte[] refCellTags;
public HMobStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam) throws IOException {
super(region, family, confParam);
final Configuration confParam, boolean warmup) throws IOException {
super(region, family, confParam, warmup);
this.family = family;
this.mobFileCache = region.getMobFileCache();
this.homePath = MobUtils.getMobHome(conf);

View File

@ -1052,6 +1052,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
private long initializeStores(CancelableProgressable reporter, MonitoredTask status)
throws IOException {
return initializeStores(reporter, status, false);
}
private long initializeStores(CancelableProgressable reporter, MonitoredTask status,
boolean warmup) throws IOException {
// Load in all the HStores.
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
@ -1069,7 +1074,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
completionService.submit(new Callable<HStore>() {
@Override
public HStore call() throws IOException {
return instantiateHStore(family);
return instantiateHStore(family, warmup);
}
});
}
@ -1129,7 +1134,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Initialize all the HStores
status.setStatus("Warming up all the Stores");
try {
initializeStores(reporter, status);
initializeStores(reporter, status, true);
} finally {
status.markComplete("Done warming up.");
}
@ -5814,17 +5819,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return true;
}
protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException {
protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
throws IOException {
if (family.isMobEnabled()) {
if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
throw new IOException("A minimum HFile version of "
+ HFile.MIN_FORMAT_VERSION_WITH_TAGS
+ " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY
+ " accordingly.");
throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS +
" is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY +
" accordingly.");
}
return new HMobStore(this, family, this.conf);
return new HMobStore(this, family, this.conf, warmup);
}
return new HStore(this, family, this.conf);
return new HStore(this, family, this.conf, warmup);
}
@Override

View File

@ -241,7 +241,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* @throws IOException
*/
protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam) throws IOException {
final Configuration confParam, boolean warmup) throws IOException {
this.fs = region.getRegionFileSystem();
@ -303,7 +303,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
List<HStoreFile> hStoreFiles = loadStoreFiles();
List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
// Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
// replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and
// update the storeSize in the completeCompaction(..) finally (just like compaction) , so
@ -555,12 +555,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* from the given directory.
* @throws IOException
*/
private List<HStoreFile> loadStoreFiles() throws IOException {
private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
return openStoreFiles(files);
return openStoreFiles(files, warmup);
}
private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, boolean warmup)
throws IOException {
if (CollectionUtils.isEmpty(files)) {
return Collections.emptyList();
}
@ -614,6 +615,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
throw ioe;
}
// Should not archive the compacted store files when region warmup. See HBASE-22163.
if (!warmup) {
// Remove the compacted files from result
List<HStoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
for (HStoreFile storeFile : results) {
@ -628,6 +631,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
LOG.debug("Moving the files {} to archive", filesToRemove);
this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);
}
}
return results;
}
@ -694,7 +698,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
// try to open the files
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
// propogate the file changes to the underlying store file manager
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception

View File

@ -2142,10 +2142,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return response;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Warming up Region " + region.getRegionNameAsString());
}
htd = regionServer.tableDescriptors.get(region.getTable());
if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
@ -2153,6 +2149,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return response;
}
LOG.info("Warming up region " + region.getRegionNameAsString());
HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
regionServer.getConfiguration(), regionServer, null);

View File

@ -194,17 +194,19 @@ public class TestIOFencing {
TableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
}
@Override
protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException {
return new BlockCompactionsInCompletionHStore(this, family, this.conf);
protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
throws IOException {
return new BlockCompactionsInCompletionHStore(this, family, this.conf, warmup);
}
}
public static class BlockCompactionsInCompletionHStore extends HStore {
CompactionBlockerRegion r;
protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescriptor family,
Configuration confParam) throws IOException {
super(region, family, confParam);
Configuration confParam, boolean warmup) throws IOException {
super(region, family, confParam, warmup);
r = (CompactionBlockerRegion) region;
}

View File

@ -116,16 +116,17 @@ public class TestFromClientSideScanExcpetion {
}
@Override
protected HStore instantiateHStore(ColumnFamilyDescriptor family) throws IOException {
return new MyHStore(this, family, conf);
protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup)
throws IOException {
return new MyHStore(this, family, conf, warmup);
}
}
public static final class MyHStore extends HStore {
public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam)
throws IOException {
super(region, family, confParam);
public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam,
boolean warmup) throws IOException {
super(region, family, confParam, warmup);
}
@Override

View File

@ -182,7 +182,7 @@ public class TestCacheOnWriteInSchema {
region = TEST_UTIL.createLocalHRegion(info, htd, walFactory.getWAL(info));
region.setBlockCache(BlockCacheFactory.createBlockCache(conf));
store = new HStore(region, hcd, conf);
store = new HStore(region, hcd, conf, false);
}
@After

View File

@ -118,7 +118,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
this.store = new HStore(region, hcd, conf);
this.store = new HStore(region, hcd, conf, false);
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));

View File

@ -107,7 +107,7 @@ public class TestCompactionPolicy {
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
store = new HStore(region, hcd, conf);
store = new HStore(region, hcd, conf, false);
TEST_FILE = region.getRegionFileSystem().createTempName();
fs.createNewFile(TEST_FILE);

View File

@ -166,7 +166,7 @@ public class TestHMobStore {
final WALFactory wals = new WALFactory(walConf, methodName);
region = new HRegion(tableDir, wals.getWAL(info), fs, conf, info, td, null);
region.setMobFileCache(new MobFileCache(conf));
store = new HMobStore(region, cfd, conf);
store = new HMobStore(region, cfd, conf, false);
if (testStore) {
init(conf, cfd);
}

View File

@ -6306,17 +6306,17 @@ public class TestHRegion {
* @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
*/
@Override
protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException {
protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
throws IOException {
if (family.isMobEnabled()) {
if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
throw new IOException("A minimum HFile version of "
+ HFile.MIN_FORMAT_VERSION_WITH_TAGS
+ " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY
+ " accordingly.");
throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS +
" is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY +
" accordingly.");
}
return new HMobStore(this, family, this.conf);
return new HMobStore(this, family, this.conf, warmup);
}
return new HStoreForTesting(this, family, this.conf);
return new HStoreForTesting(this, family, this.conf, warmup);
}
}
@ -6332,8 +6332,8 @@ public class TestHRegion {
protected HStoreForTesting(final HRegion region,
final ColumnFamilyDescriptor family,
final Configuration confParam) throws IOException {
super(region, family, confParam);
final Configuration confParam, boolean warmup) throws IOException {
super(region, family, confParam, warmup);
}
@Override

View File

@ -230,7 +230,7 @@ public class TestHStore {
ColumnFamilyDescriptor hcd, MyStoreHook hook, boolean switchToPread) throws IOException {
initHRegion(methodName, conf, builder, hcd, hook, switchToPread);
if (hook == null) {
store = new HStore(region, hcd, conf);
store = new HStore(region, hcd, conf, false);
} else {
store = new MyStore(region, hcd, conf, hook, switchToPread);
}
@ -495,7 +495,8 @@ public class TestHStore {
w.close();
this.store.close();
// Reopen it... should pick up two files
this.store = new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c);
this.store =
new HStore(this.store.getHRegion(), this.store.getColumnFamilyDescriptor(), c, false);
assertEquals(2, this.store.getStorefilesCount());
result = HBaseTestingUtility.getFromStoreFile(store,
@ -1525,7 +1526,7 @@ public class TestHStore {
ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(family);
initHRegion(name.getMethodName(), conf,
TableDescriptorBuilder.newBuilder(TableName.valueOf(table)), hcd, null, false);
HStore store = new HStore(region, hcd, conf) {
HStore store = new HStore(region, hcd, conf, false) {
@Override
protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,
@ -1567,7 +1568,7 @@ public class TestHStore {
MyStore(final HRegion region, final ColumnFamilyDescriptor family, final Configuration
confParam, MyStoreHook hook, boolean switchToPread) throws IOException {
super(region, family, confParam);
super(region, family, confParam, false);
this.hook = hook;
}

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
@Category({ LargeTests.class, RegionServerTests.class })
public class TestNotCleanupCompactedFileWhenRegionWarmup {
private static final Logger LOG =
LoggerFactory.getLogger(TestNotCleanupCompactedFileWhenRegionWarmup.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNotCleanupCompactedFileWhenRegionWarmup.class);
private static HBaseTestingUtility TEST_UTIL;
private static Admin admin;
private static Table table;
private static TableName TABLE_NAME = TableName.valueOf("TestCleanupCompactedFileAfterFailover");
private static byte[] ROW = Bytes.toBytes("row");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static byte[] VALUE = Bytes.toBytes("value");
@BeforeClass
public static void beforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
// Set the scanner lease to 20min, so the scanner can't be closed by RegionServer
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000);
TEST_UTIL.getConfiguration()
.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
TEST_UTIL.getConfiguration().set("dfs.blocksize", "64000");
TEST_UTIL.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
TEST_UTIL.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY, "0");
TEST_UTIL.startMiniCluster(1);
admin = TEST_UTIL.getAdmin();
}
@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void before() throws Exception {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
admin.createTable(builder.build());
TEST_UTIL.waitTableAvailable(TABLE_NAME);
table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
}
@After
public void after() throws Exception {
admin.disableTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
}
@Test
public void testRegionWarmup() throws Exception {
List<HRegion> regions = new ArrayList<>();
for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
.getLiveRegionServerThreads()) {
HRegionServer rs = rsThread.getRegionServer();
if (rs.getOnlineTables().contains(TABLE_NAME)) {
regions.addAll(rs.getRegions(TABLE_NAME));
}
}
assertEquals("Table should only have one region", 1, regions.size());
HRegion region = regions.get(0);
HStore store = region.getStore(FAMILY);
writeDataAndFlush(3, region);
assertEquals(3, store.getStorefilesCount());
// Open a scanner and not close, then the storefile will be referenced
store.getScanner(new Scan(), null, 0);
region.compact(true);
assertEquals(1, store.getStorefilesCount());
// The compacted file should not be archived as there are references by user scanner
assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
HStore newStore = region.instantiateHStore(ColumnFamilyDescriptorBuilder.of(FAMILY), true);
// Should not archive the compacted storefiles when region warmup
assertEquals(4, newStore.getStorefilesCount());
newStore = region.instantiateHStore(ColumnFamilyDescriptorBuilder.of(FAMILY), false);
// Archived the compacted storefiles when region real open
assertEquals(1, newStore.getStorefilesCount());
}
private void writeDataAndFlush(int fileNum, HRegion region) throws Exception {
for (int i = 0; i < fileNum; i++) {
for (int j = 0; j < 100; j++) {
table.put(new Put(concat(ROW, j)).addColumn(FAMILY, QUALIFIER, concat(VALUE, j)));
}
region.flush(true);
}
}
private byte[] concat(byte[] base, int index) {
return Bytes.toBytes(Bytes.toString(base) + "-" + index);
}
}