HBASE-23680 RegionProcedureStore missing cleaning of hfile archive (#1022)
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
65bcf55892
commit
167892ce64
|
@ -151,7 +151,18 @@ possible configurations would overwhelm and obscure the important.
|
|||
so put the cleaner that prunes the most files in front. To
|
||||
implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath
|
||||
and add the fully qualified class name here. Always add the above
|
||||
default log cleaners in the list as they will be overwritten in
|
||||
default hfile cleaners in the list as they will be overwritten in
|
||||
hbase-site.xml.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.procedure.store.region.hfilecleaner.plugins</name>
|
||||
<value>org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner</value>
|
||||
<description>A comma-separated list of BaseHFileCleanerDelegate invoked by
|
||||
the RegionProcedureStore HFileCleaner service. These HFiles cleaners are
|
||||
called in order, so put the cleaner that prunes the most files in front. To
|
||||
implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath
|
||||
and add the fully qualified class name here. Always add the above
|
||||
default hfile cleaners in the list as they will be overwritten in
|
||||
hbase-site.xml.</description>
|
||||
</property>
|
||||
<property>
|
||||
|
|
|
@ -123,11 +123,15 @@ public abstract class ProcedureStorePerformanceEvaluation<T extends ProcedureSto
|
|||
|
||||
protected abstract T createProcedureStore(Path storeDir) throws IOException;
|
||||
|
||||
protected void postStop(T store) throws IOException {
|
||||
}
|
||||
|
||||
private void tearDownProcedureStore() {
|
||||
Path storeDir = null;
|
||||
try {
|
||||
if (store != null) {
|
||||
store.stop(false);
|
||||
postStop(store);
|
||||
}
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
storeDir = fs.makeQualified(new Path(outputPath));
|
||||
|
|
|
@ -1413,8 +1413,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
|
||||
startProcedureExecutor();
|
||||
|
||||
// Create cleaner thread pool
|
||||
cleanerPool = new DirScanPool(conf);
|
||||
// Start log cleaner thread
|
||||
int cleanerInterval =
|
||||
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
|
||||
|
@ -1520,8 +1518,10 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
private void createProcedureExecutor() throws IOException {
|
||||
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
|
||||
procedureStore =
|
||||
new RegionProcedureStore(this, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
|
||||
// Create cleaner thread pool
|
||||
cleanerPool = new DirScanPool(conf);
|
||||
procedureStore = new RegionProcedureStore(this, cleanerPool,
|
||||
new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
|
||||
procedureStore.registerListener(new ProcedureStoreListener() {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -117,8 +117,26 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
|
|||
*/
|
||||
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
|
||||
Path directory, DirScanPool pool, Map<String, Object> params) {
|
||||
super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
|
||||
this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
|
||||
params);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* For creating customized HFileCleaner.
|
||||
* @param name name of the chore being run
|
||||
* @param period the period of time to sleep between each run
|
||||
* @param stopper the stopper
|
||||
* @param conf configuration to use
|
||||
* @param fs handle to the FS
|
||||
* @param directory directory to be cleaned
|
||||
* @param confKey configuration key for the classes to instantiate
|
||||
* @param pool the thread pool used to scan directories
|
||||
* @param params params could be used in subclass of BaseHFileCleanerDelegate
|
||||
*/
|
||||
public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
|
||||
Path directory, String confKey, DirScanPool pool, Map<String, Object> params) {
|
||||
super(name, period, stopper, conf, fs, directory, confKey, pool, params);
|
||||
throttlePoint =
|
||||
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
|
||||
largeQueueInitSize =
|
||||
|
@ -133,8 +151,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
|
|||
conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
|
||||
cleanerThreadTimeoutMsec =
|
||||
conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
|
||||
cleanerThreadCheckIntervalMsec =
|
||||
conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
|
||||
cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
|
||||
DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
|
||||
startHFileDeleteThreads();
|
||||
}
|
||||
|
|
|
@ -120,8 +120,8 @@ class RegionFlusherAndCompactor implements Closeable {
|
|||
flushThread.start();
|
||||
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
||||
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
|
||||
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, " +
|
||||
"compactMin={}", flushSize, flushPerChanges, flushIntervalMs, compactMin);
|
||||
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
|
||||
flushSize, flushPerChanges, flushIntervalMs, compactMin);
|
||||
}
|
||||
|
||||
// inject our flush related configurations
|
||||
|
@ -139,6 +139,7 @@ class RegionFlusherAndCompactor implements Closeable {
|
|||
private void compact() {
|
||||
try {
|
||||
region.compact(true);
|
||||
Iterables.getOnlyElement(region.getStores()).closeAndArchiveCompactedFiles();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to compact procedure store region", e);
|
||||
}
|
||||
|
|
|
@ -48,9 +48,12 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
|
||||
import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
|
||||
import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
|
||||
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
|
@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
|
@ -119,7 +123,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
|
||||
static final String MASTER_PROCEDURE_DIR = "MasterProcs";
|
||||
|
||||
static final String LOGCLEANER_PLUGINS = "hbase.procedure.store.region.logcleaner.plugins";
|
||||
static final String HFILECLEANER_PLUGINS = "hbase.procedure.store.region.hfilecleaner.plugins";
|
||||
|
||||
private static final String REPLAY_EDITS_DIR = "recovered.wals";
|
||||
|
||||
|
@ -138,22 +142,31 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
|
||||
private final Server server;
|
||||
|
||||
private final DirScanPool cleanerPool;
|
||||
|
||||
private final LeaseRecovery leaseRecovery;
|
||||
|
||||
// Used to delete the compacted hfiles. Since we put all data on WAL filesystem, it is not
|
||||
// possible to move the compacted hfiles to the global hfile archive directory, we have to do it
|
||||
// by ourselves.
|
||||
private HFileCleaner cleaner;
|
||||
|
||||
private WALFactory walFactory;
|
||||
|
||||
@VisibleForTesting
|
||||
HRegion region;
|
||||
|
||||
private RegionFlusherAndCompactor flusherAndCompactor;
|
||||
@VisibleForTesting
|
||||
RegionFlusherAndCompactor flusherAndCompactor;
|
||||
|
||||
@VisibleForTesting
|
||||
RegionProcedureStoreWALRoller walRoller;
|
||||
|
||||
private int numThreads;
|
||||
|
||||
public RegionProcedureStore(Server server, LeaseRecovery leaseRecovery) {
|
||||
public RegionProcedureStore(Server server, DirScanPool cleanerPool, LeaseRecovery leaseRecovery) {
|
||||
this.server = server;
|
||||
this.cleanerPool = cleanerPool;
|
||||
this.leaseRecovery = leaseRecovery;
|
||||
}
|
||||
|
||||
|
@ -193,6 +206,9 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
return;
|
||||
}
|
||||
LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
|
||||
if (cleaner != null) {
|
||||
cleaner.cancel(abort);
|
||||
}
|
||||
if (flusherAndCompactor != null) {
|
||||
flusherAndCompactor.close();
|
||||
}
|
||||
|
@ -423,11 +439,11 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
|
||||
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
|
||||
}
|
||||
store.stop(false);
|
||||
if (!fs.delete(procWALDir, true)) {
|
||||
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
|
||||
procWALDir);
|
||||
throw new IOException(
|
||||
"Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir);
|
||||
}
|
||||
store.stop(true);
|
||||
LOG.info("Migration of WALProcedureStore finished");
|
||||
}
|
||||
|
||||
|
@ -463,6 +479,16 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
}
|
||||
flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
|
||||
walRoller.setFlusherAndCompactor(flusherAndCompactor);
|
||||
int cleanerInterval = conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL,
|
||||
HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
|
||||
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
|
||||
if (!fs.mkdirs(archiveDir)) {
|
||||
LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
|
||||
" be created again when we actually archive the hfiles later, so continue", archiveDir);
|
||||
}
|
||||
cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", cleanerInterval, server, conf,
|
||||
fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, Collections.emptyMap());
|
||||
server.getChoreService().scheduleChore(cleaner);
|
||||
tryMigrate(fs);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
||||
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
|
||||
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
||||
|
@ -46,21 +47,29 @@ public class RegionProcedureStorePerformanceEvaluation
|
|||
private final ServerName serverName =
|
||||
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
|
||||
|
||||
private final ChoreService choreService;
|
||||
|
||||
private volatile boolean abort = false;
|
||||
|
||||
public MockServer(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.choreService = new ChoreService("Cleaner-Chore-Service");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
abort = true;
|
||||
choreService.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
return abort;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
choreService.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,10 +114,12 @@ public class RegionProcedureStorePerformanceEvaluation
|
|||
|
||||
@Override
|
||||
public ChoreService getChoreService() {
|
||||
throw new UnsupportedOperationException();
|
||||
return choreService;
|
||||
}
|
||||
}
|
||||
|
||||
private DirScanPool cleanerPool;
|
||||
|
||||
@Override
|
||||
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
|
||||
Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
|
||||
|
@ -123,7 +134,8 @@ public class RegionProcedureStorePerformanceEvaluation
|
|||
initialCountPercentage, null);
|
||||
conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
|
||||
CommonFSUtils.setRootDir(conf, storeDir);
|
||||
return new RegionProcedureStore(new MockServer(conf), (fs, apth) -> {
|
||||
cleanerPool = new DirScanPool(conf);
|
||||
return new RegionProcedureStore(new MockServer(conf), cleanerPool, (fs, apth) -> {
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -138,6 +150,11 @@ public class RegionProcedureStorePerformanceEvaluation
|
|||
protected void preWrite(long procId) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postStop(RegionProcedureStore store) throws IOException {
|
||||
cleanerPool.shutdownNow();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
RegionProcedureStorePerformanceEvaluation tool =
|
||||
new RegionProcedureStorePerformanceEvaluation();
|
||||
|
|
|
@ -18,8 +18,11 @@
|
|||
package org.apache.hadoop.hbase.procedure2.store.region;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
|
@ -32,18 +35,31 @@ public class RegionProcedureStoreTestBase {
|
|||
|
||||
protected RegionProcedureStore store;
|
||||
|
||||
protected ChoreService choreService;
|
||||
|
||||
protected DirScanPool cleanerPool;
|
||||
|
||||
protected void configure(Configuration conf) {
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
htu = new HBaseCommonTestingUtility();
|
||||
htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
|
||||
configure(htu.getConfiguration());
|
||||
Path testDir = htu.getDataTestDir();
|
||||
CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
|
||||
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter());
|
||||
choreService = new ChoreService(getClass().getSimpleName());
|
||||
cleanerPool = new DirScanPool(htu.getConfiguration());
|
||||
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
|
||||
cleanerPool, new LoadCounter());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
store.stop(true);
|
||||
cleanerPool.shutdownNow();
|
||||
choreService.shutdown();
|
||||
htu.cleanupTestDir();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,10 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
||||
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
|
||||
|
||||
|
@ -34,13 +36,14 @@ final class RegionProcedureStoreTestHelper {
|
|||
private RegionProcedureStoreTestHelper() {
|
||||
}
|
||||
|
||||
static RegionProcedureStore createStore(Configuration conf, ProcedureLoader loader)
|
||||
throws IOException {
|
||||
static RegionProcedureStore createStore(Configuration conf, ChoreService choreService,
|
||||
DirScanPool cleanerPool, ProcedureLoader loader) throws IOException {
|
||||
Server server = mock(Server.class);
|
||||
when(server.getConfiguration()).thenReturn(conf);
|
||||
when(server.getServerName())
|
||||
.thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
|
||||
RegionProcedureStore store = new RegionProcedureStore(server, new LeaseRecovery() {
|
||||
when(server.getChoreService()).thenReturn(choreService);
|
||||
RegionProcedureStore store = new RegionProcedureStore(server, cleanerPool, new LeaseRecovery() {
|
||||
|
||||
@Override
|
||||
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.procedure2.store.region;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
|
||||
@Category({ MasterTests.class, MediumTests.class })
|
||||
public class TestRegionProcedureStoreCompaction extends RegionProcedureStoreTestBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionProcedureStoreCompaction.class);
|
||||
|
||||
private int compactMin = 4;
|
||||
|
||||
@Override
|
||||
protected void configure(Configuration conf) {
|
||||
conf.setInt(RegionFlusherAndCompactor.COMPACT_MIN_KEY, compactMin);
|
||||
conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 500);
|
||||
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 5000);
|
||||
}
|
||||
|
||||
private int getStorefilesCount() {
|
||||
return Iterables.getOnlyElement(store.region.getStores()).getStorefilesCount();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException, InterruptedException {
|
||||
for (int i = 0; i < compactMin - 1; i++) {
|
||||
store.insert(new RegionProcedureStoreTestProcedure(), null);
|
||||
store.region.flush(true);
|
||||
}
|
||||
assertEquals(compactMin - 1, getStorefilesCount());
|
||||
store.insert(new RegionProcedureStoreTestProcedure(), null);
|
||||
store.flusherAndCompactor.requestFlush();
|
||||
htu.waitFor(15000, () -> getStorefilesCount() == 1);
|
||||
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
|
||||
new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
|
||||
store.region.getRegionInfo(), RegionProcedureStore.FAMILY);
|
||||
FileSystem fs = storeArchiveDir.getFileSystem(htu.getConfiguration());
|
||||
// after compaction, the old hfiles should have been compacted
|
||||
htu.waitFor(15000, () -> {
|
||||
try {
|
||||
FileStatus[] fses = fs.listStatus(storeArchiveDir);
|
||||
return fses != null && fses.length == compactMin;
|
||||
} catch (FileNotFoundException e) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
// ttl has not expired, so should not delete any files
|
||||
Thread.sleep(1000);
|
||||
FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
|
||||
assertEquals(4, compactedHFiles.length);
|
||||
Thread.sleep(2000);
|
||||
// touch one file
|
||||
long currentTime = System.currentTimeMillis();
|
||||
fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
|
||||
Thread.sleep(3000);
|
||||
// only the touched file is still there after clean up
|
||||
FileStatus[] remainingHFiles = fs.listStatus(storeArchiveDir);
|
||||
assertEquals(1, remainingHFiles.length);
|
||||
assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
|
||||
Thread.sleep(6000);
|
||||
// the touched file should also be cleaned up and then the cleaner will delete the parent
|
||||
// directory since it is empty.
|
||||
assertFalse(fs.exists(storeArchiveDir));
|
||||
}
|
||||
}
|
|
@ -32,12 +32,14 @@ import org.apache.commons.lang3.mutable.MutableLong;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
|
||||
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
|
||||
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
||||
|
@ -67,6 +69,10 @@ public class TestRegionProcedureStoreMigration {
|
|||
|
||||
private WALProcedureStore walStore;
|
||||
|
||||
private ChoreService choreService;
|
||||
|
||||
private DirScanPool cleanerPool;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
htu = new HBaseCommonTestingUtility();
|
||||
|
@ -83,6 +89,8 @@ public class TestRegionProcedureStoreMigration {
|
|||
walStore.start(1);
|
||||
walStore.recoverLease();
|
||||
walStore.load(new LoadCounter());
|
||||
choreService = new ChoreService(getClass().getSimpleName());
|
||||
cleanerPool = new DirScanPool(htu.getConfiguration());
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -91,6 +99,8 @@ public class TestRegionProcedureStoreMigration {
|
|||
store.stop(true);
|
||||
}
|
||||
walStore.stop(true);
|
||||
cleanerPool.shutdownNow();
|
||||
choreService.shutdown();
|
||||
htu.cleanupTestDir();
|
||||
}
|
||||
|
||||
|
@ -109,8 +119,8 @@ public class TestRegionProcedureStoreMigration {
|
|||
SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
|
||||
new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
|
||||
MutableLong maxProcIdSet = new MutableLong(0);
|
||||
store =
|
||||
RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new ProcedureLoader() {
|
||||
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
|
||||
cleanerPool, new ProcedureLoader() {
|
||||
|
||||
@Override
|
||||
public void setMaxProcId(long maxProcId) {
|
||||
|
@ -156,7 +166,8 @@ public class TestRegionProcedureStoreMigration {
|
|||
walStore.stop(true);
|
||||
|
||||
try {
|
||||
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter());
|
||||
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
|
||||
cleanerPool, new LoadCounter());
|
||||
fail("Should fail since AssignProcedure is not supported");
|
||||
} catch (HBaseIOException e) {
|
||||
assertThat(e.getMessage(), startsWith("Unsupported"));
|
||||
|
|
|
@ -95,7 +95,8 @@ public class TestRegionProcedureStoreWALCleaner {
|
|||
}
|
||||
}, conf, fs, globalWALArchiveDir, dirScanPool);
|
||||
choreService.scheduleChore(logCleaner);
|
||||
store = RegionProcedureStoreTestHelper.createStore(conf, new LoadCounter());
|
||||
store = RegionProcedureStoreTestHelper.createStore(conf, choreService, dirScanPool,
|
||||
new LoadCounter());
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
Loading…
Reference in New Issue