diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 7b077933fde..f611a2b2fbc 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -151,7 +151,18 @@ possible configurations would overwhelm and obscure the important.
so put the cleaner that prunes the most files in front. To
implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath
and add the fully qualified class name here. Always add the above
- default log cleaners in the list as they will be overwritten in
+ default hfile cleaners in the list as they will be overwritten in
+ hbase-site.xml.
+
+
+ hbase.procedure.store.region.hfilecleaner.plugins
+ org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner
+ A comma-separated list of BaseHFileCleanerDelegate invoked by
+ the RegionProcedureStore HFileCleaner service. These HFiles cleaners are
+ called in order, so put the cleaner that prunes the most files in front. To
+ implement your own BaseHFileCleanerDelegate, just put it in HBase's classpath
+ and add the fully qualified class name here. Always add the above
+ default hfile cleaners in the list as they will be overwritten in
hbase-site.xml.
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
index b4888c5b162..cb31f02c429 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
@@ -123,11 +123,15 @@ public abstract class ProcedureStorePerformanceEvaluation
*/
public HFileCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem fs,
Path directory, DirScanPool pool, Map params) {
- super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
+ this("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, pool,
params);
+
+ }
+
+ /**
+ * For creating customized HFileCleaner.
+ * @param name name of the chore being run
+ * @param period the period of time to sleep between each run
+ * @param stopper the stopper
+ * @param conf configuration to use
+ * @param fs handle to the FS
+ * @param directory directory to be cleaned
+ * @param confKey configuration key for the classes to instantiate
+ * @param pool the thread pool used to scan directories
+ * @param params params could be used in subclass of BaseHFileCleanerDelegate
+ */
+ public HFileCleaner(String name, int period, Stoppable stopper, Configuration conf, FileSystem fs,
+ Path directory, String confKey, DirScanPool pool, Map params) {
+ super(name, period, stopper, conf, fs, directory, confKey, pool, params);
throttlePoint =
- conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+ conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
largeQueueInitSize =
- conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
+ conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
smallQueueInitSize =
- conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
+ conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize, COMPARATOR);
smallFileQueue = largeFileQueue.getStealFromQueue();
largeFileDeleteThreadNumber =
- conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
+ conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
smallFileDeleteThreadNumber =
- conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
+ conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
cleanerThreadTimeoutMsec =
- conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
- cleanerThreadCheckIntervalMsec =
- conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
- DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
+ conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
+ cleanerThreadCheckIntervalMsec = conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
+ DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
startHFileDeleteThreads();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
index 8e3ffa375ef..57e62ddf654 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
@@ -120,8 +120,8 @@ class RegionFlusherAndCompactor implements Closeable {
flushThread.start();
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
- LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, " +
- "compactMin={}", flushSize, flushPerChanges, flushIntervalMs, compactMin);
+ LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
+ flushSize, flushPerChanges, flushIntervalMs, compactMin);
}
// inject our flush related configurations
@@ -139,6 +139,7 @@ class RegionFlusherAndCompactor implements Closeable {
private void compact() {
try {
region.compact(true);
+ Iterables.getOnlyElement(region.getStores()).closeAndArchiveCompactedFiles();
} catch (IOException e) {
LOG.error("Failed to compact procedure store region", e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
index 35032a3e100..3d01b01fc49 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -48,9 +48,12 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -119,7 +123,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
static final String MASTER_PROCEDURE_DIR = "MasterProcs";
- static final String LOGCLEANER_PLUGINS = "hbase.procedure.store.region.logcleaner.plugins";
+ static final String HFILECLEANER_PLUGINS = "hbase.procedure.store.region.hfilecleaner.plugins";
private static final String REPLAY_EDITS_DIR = "recovered.wals";
@@ -138,22 +142,31 @@ public class RegionProcedureStore extends ProcedureStoreBase {
private final Server server;
+ private final DirScanPool cleanerPool;
+
private final LeaseRecovery leaseRecovery;
+ // Used to delete the compacted hfiles. Since we put all data on WAL filesystem, it is not
+ // possible to move the compacted hfiles to the global hfile archive directory, we have to do it
+ // by ourselves.
+ private HFileCleaner cleaner;
+
private WALFactory walFactory;
@VisibleForTesting
HRegion region;
- private RegionFlusherAndCompactor flusherAndCompactor;
+ @VisibleForTesting
+ RegionFlusherAndCompactor flusherAndCompactor;
@VisibleForTesting
RegionProcedureStoreWALRoller walRoller;
private int numThreads;
- public RegionProcedureStore(Server server, LeaseRecovery leaseRecovery) {
+ public RegionProcedureStore(Server server, DirScanPool cleanerPool, LeaseRecovery leaseRecovery) {
this.server = server;
+ this.cleanerPool = cleanerPool;
this.leaseRecovery = leaseRecovery;
}
@@ -193,6 +206,9 @@ public class RegionProcedureStore extends ProcedureStoreBase {
return;
}
LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
+ if (cleaner != null) {
+ cleaner.cancel(abort);
+ }
if (flusherAndCompactor != null) {
flusherAndCompactor.close();
}
@@ -423,11 +439,11 @@ public class RegionProcedureStore extends ProcedureStoreBase {
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
}
+ store.stop(false);
if (!fs.delete(procWALDir, true)) {
- throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
- procWALDir);
+ throw new IOException(
+ "Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir);
}
- store.stop(true);
LOG.info("Migration of WALProcedureStore finished");
}
@@ -463,6 +479,16 @@ public class RegionProcedureStore extends ProcedureStoreBase {
}
flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
walRoller.setFlusherAndCompactor(flusherAndCompactor);
+ int cleanerInterval = conf.getInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL,
+ HMaster.DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
+ Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
+ if (!fs.mkdirs(archiveDir)) {
+ LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" +
+ " be created again when we actually archive the hfiles later, so continue", archiveDir);
+ }
+ cleaner = new HFileCleaner("RegionProcedureStoreHFileCleaner", cleanerInterval, server, conf,
+ fs, archiveDir, HFILECLEANER_PLUGINS, cleanerPool, Collections.emptyMap());
+ server.getChoreService().scheduleChore(cleaner);
tryMigrate(fs);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
index f7e6b4ba176..fa8b1511b4f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
@@ -46,21 +47,29 @@ public class RegionProcedureStorePerformanceEvaluation
private final ServerName serverName =
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
+ private final ChoreService choreService;
+
+ private volatile boolean abort = false;
+
public MockServer(Configuration conf) {
this.conf = conf;
+ this.choreService = new ChoreService("Cleaner-Chore-Service");
}
@Override
public void abort(String why, Throwable e) {
+ abort = true;
+ choreService.shutdown();
}
@Override
public boolean isAborted() {
- return false;
+ return abort;
}
@Override
public void stop(String why) {
+ choreService.shutdown();
}
@Override
@@ -105,10 +114,12 @@ public class RegionProcedureStorePerformanceEvaluation
@Override
public ChoreService getChoreService() {
- throw new UnsupportedOperationException();
+ return choreService;
}
}
+ private DirScanPool cleanerPool;
+
@Override
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
Pair pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
@@ -123,7 +134,8 @@ public class RegionProcedureStorePerformanceEvaluation
initialCountPercentage, null);
conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
CommonFSUtils.setRootDir(conf, storeDir);
- return new RegionProcedureStore(new MockServer(conf), (fs, apth) -> {
+ cleanerPool = new DirScanPool(conf);
+ return new RegionProcedureStore(new MockServer(conf), cleanerPool, (fs, apth) -> {
});
}
@@ -138,6 +150,11 @@ public class RegionProcedureStorePerformanceEvaluation
protected void preWrite(long procId) throws IOException {
}
+ @Override
+ protected void postStop(RegionProcedureStore store) throws IOException {
+ cleanerPool.shutdownNow();
+ }
+
public static void main(String[] args) throws IOException {
RegionProcedureStorePerformanceEvaluation tool =
new RegionProcedureStorePerformanceEvaluation();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
index 6f078057267..c5694d2be87 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestBase.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.hbase.procedure2.store.region;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -32,18 +35,31 @@ public class RegionProcedureStoreTestBase {
protected RegionProcedureStore store;
+ protected ChoreService choreService;
+
+ protected DirScanPool cleanerPool;
+
+ protected void configure(Configuration conf) {
+ }
+
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+ configure(htu.getConfiguration());
Path testDir = htu.getDataTestDir();
CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
- store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter());
+ choreService = new ChoreService(getClass().getSimpleName());
+ cleanerPool = new DirScanPool(htu.getConfiguration());
+ store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
+ cleanerPool, new LoadCounter());
}
@After
public void tearDown() throws IOException {
store.stop(true);
+ cleanerPool.shutdownNow();
+ choreService.shutdown();
htu.cleanupTestDir();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
index d550e7fbe4b..5497b8a5439 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
@@ -24,8 +24,10 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
@@ -34,13 +36,14 @@ final class RegionProcedureStoreTestHelper {
private RegionProcedureStoreTestHelper() {
}
- static RegionProcedureStore createStore(Configuration conf, ProcedureLoader loader)
- throws IOException {
+ static RegionProcedureStore createStore(Configuration conf, ChoreService choreService,
+ DirScanPool cleanerPool, ProcedureLoader loader) throws IOException {
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
- RegionProcedureStore store = new RegionProcedureStore(server, new LeaseRecovery() {
+ when(server.getChoreService()).thenReturn(choreService);
+ RegionProcedureStore store = new RegionProcedureStore(server, cleanerPool, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
new file mode 100644
index 00000000000..15682bb8cff
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreCompaction.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestRegionProcedureStoreCompaction extends RegionProcedureStoreTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionProcedureStoreCompaction.class);
+
+ private int compactMin = 4;
+
+ @Override
+ protected void configure(Configuration conf) {
+ conf.setInt(RegionFlusherAndCompactor.COMPACT_MIN_KEY, compactMin);
+ conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 500);
+ conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 5000);
+ }
+
+ private int getStorefilesCount() {
+ return Iterables.getOnlyElement(store.region.getStores()).getStorefilesCount();
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ for (int i = 0; i < compactMin - 1; i++) {
+ store.insert(new RegionProcedureStoreTestProcedure(), null);
+ store.region.flush(true);
+ }
+ assertEquals(compactMin - 1, getStorefilesCount());
+ store.insert(new RegionProcedureStoreTestProcedure(), null);
+ store.flusherAndCompactor.requestFlush();
+ htu.waitFor(15000, () -> getStorefilesCount() == 1);
+ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(
+ new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
+ store.region.getRegionInfo(), RegionProcedureStore.FAMILY);
+ FileSystem fs = storeArchiveDir.getFileSystem(htu.getConfiguration());
+ // after compaction, the old hfiles should have been compacted
+ htu.waitFor(15000, () -> {
+ try {
+ FileStatus[] fses = fs.listStatus(storeArchiveDir);
+ return fses != null && fses.length == compactMin;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ });
+ // ttl has not expired, so should not delete any files
+ Thread.sleep(1000);
+ FileStatus[] compactedHFiles = fs.listStatus(storeArchiveDir);
+ assertEquals(4, compactedHFiles.length);
+ Thread.sleep(2000);
+ // touch one file
+ long currentTime = System.currentTimeMillis();
+ fs.setTimes(compactedHFiles[0].getPath(), currentTime, currentTime);
+ Thread.sleep(3000);
+ // only the touched file is still there after clean up
+ FileStatus[] remainingHFiles = fs.listStatus(storeArchiveDir);
+ assertEquals(1, remainingHFiles.length);
+ assertEquals(compactedHFiles[0].getPath(), remainingHFiles[0].getPath());
+ Thread.sleep(6000);
+ // the touched file should also be cleaned up and then the cleaner will delete the parent
+ // directory since it is empty.
+ assertFalse(fs.exists(storeArchiveDir));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
index 44d7daaf6a8..9a493614a1a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
@@ -32,12 +32,14 @@ import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -67,6 +69,10 @@ public class TestRegionProcedureStoreMigration {
private WALProcedureStore walStore;
+ private ChoreService choreService;
+
+ private DirScanPool cleanerPool;
+
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
@@ -83,6 +89,8 @@ public class TestRegionProcedureStoreMigration {
walStore.start(1);
walStore.recoverLease();
walStore.load(new LoadCounter());
+ choreService = new ChoreService(getClass().getSimpleName());
+ cleanerPool = new DirScanPool(htu.getConfiguration());
}
@After
@@ -91,6 +99,8 @@ public class TestRegionProcedureStoreMigration {
store.stop(true);
}
walStore.stop(true);
+ cleanerPool.shutdownNow();
+ choreService.shutdown();
htu.cleanupTestDir();
}
@@ -109,8 +119,8 @@ public class TestRegionProcedureStoreMigration {
SortedSet loadedProcs =
new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
MutableLong maxProcIdSet = new MutableLong(0);
- store =
- RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new ProcedureLoader() {
+ store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
+ cleanerPool, new ProcedureLoader() {
@Override
public void setMaxProcId(long maxProcId) {
@@ -156,7 +166,8 @@ public class TestRegionProcedureStoreMigration {
walStore.stop(true);
try {
- store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter());
+ store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), choreService,
+ cleanerPool, new LoadCounter());
fail("Should fail since AssignProcedure is not supported");
} catch (HBaseIOException e) {
assertThat(e.getMessage(), startsWith("Unsupported"));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
index 826c7637bb3..db499427df5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
@@ -95,7 +95,8 @@ public class TestRegionProcedureStoreWALCleaner {
}
}, conf, fs, globalWALArchiveDir, dirScanPool);
choreService.scheduleChore(logCleaner);
- store = RegionProcedureStoreTestHelper.createStore(conf, new LoadCounter());
+ store = RegionProcedureStoreTestHelper.createStore(conf, choreService, dirScanPool,
+ new LoadCounter());
}
@After