diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index a5e87ae0eb9..47b0228a319 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -23,6 +23,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -76,6 +77,16 @@ public class HFileCleaner extends CleanerChore { "hbase.regionserver.hfilecleaner.small.thread.count"; public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1; + public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC = + "hbase.regionserver.hfilecleaner.thread.timeout.msec"; + @VisibleForTesting + static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L; + + public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = + "hbase.regionserver.hfilecleaner.thread.check.interval.msec"; + @VisibleForTesting + static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; + private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class); StealJobQueue largeFileQueue; @@ -85,6 +96,8 @@ public class HFileCleaner extends CleanerChore { private int smallQueueInitSize; private int largeFileDeleteThreadNumber; private int smallFileDeleteThreadNumber; + private long cleanerThreadTimeoutMsec; + private long cleanerThreadCheckIntervalMsec; private List threads = new ArrayList(); private boolean running; @@ -115,6 +128,11 @@ public class HFileCleaner extends CleanerChore { conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); smallFileDeleteThreadNumber = conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); + cleanerThreadTimeoutMsec = + conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); + cleanerThreadCheckIntervalMsec = + conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); startHFileDeleteThreads(); } @@ -146,7 +164,7 @@ public class HFileCleaner extends CleanerChore { } // wait for each submitted task to finish for (HFileDeleteTask task : tasks) { - if (task.getResult()) { + if (task.getResult(cleanerThreadCheckIntervalMsec)) { deletedFiles++; } } @@ -159,7 +177,7 @@ public class HFileCleaner extends CleanerChore { * @return HFileDeleteTask to track progress */ private HFileDeleteTask deleteFile(FileStatus file) { - HFileDeleteTask task = new HFileDeleteTask(file); + HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec); boolean enqueued = dispatch(task); return enqueued ? task : null; } @@ -300,17 +318,17 @@ public class HFileCleaner extends CleanerChore { }; private static final class HFileDeleteTask { - private static final long MAX_WAIT = 60 * 1000L; - private static final long WAIT_UNIT = 1000L; boolean done = false; boolean result; final Path filePath; final long fileLength; + final long timeoutMsec; - public HFileDeleteTask(FileStatus file) { + public HFileDeleteTask(FileStatus file, long timeoutMsec) { this.filePath = file.getPath(); this.fileLength = file.getLen(); + this.timeoutMsec = timeoutMsec; } public synchronized void setResult(boolean result) { @@ -319,17 +337,19 @@ public class HFileCleaner extends CleanerChore { notify(); } - public synchronized boolean getResult() { - long waitTime = 0; + public synchronized boolean getResult(long waitIfNotFinished) { + long waitTimeMsec = 0; try { while (!done) { - wait(WAIT_UNIT); - waitTime += WAIT_UNIT; + long startTimeNanos = System.nanoTime(); + wait(waitIfNotFinished); + waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, + TimeUnit.NANOSECONDS); if (done) { return this.result; } - if (waitTime > MAX_WAIT) { - LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath + if (waitTimeMsec > timeoutMsec) { + LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath + ", exit..."); return false; } @@ -373,6 +393,16 @@ public class HFileCleaner extends CleanerChore { return throttlePoint; } + @VisibleForTesting + long getCleanerThreadTimeoutMsec() { + return cleanerThreadTimeoutMsec; + } + + @VisibleForTesting + long getCleanerThreadCheckIntervalMsec() { + return cleanerThreadCheckIntervalMsec; + } + @Override public void onConfigurationChange(Configuration conf) { super.onConfigurationChange(conf); @@ -443,6 +473,19 @@ public class HFileCleaner extends CleanerChore { this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber; updated = true; } + long cleanerThreadTimeoutMsec = + conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); + if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) { + this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec; + updated = true; + } + long cleanerThreadCheckIntervalMsec = + conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); + if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) { + this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec; + updated = true; + } return updated; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index bc27991b6a7..db098e26b47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -46,11 +47,24 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti public class LogCleaner extends CleanerChore { private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName()); - public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size"; - public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2; + public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; + public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; + + public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = + "hbase.oldwals.cleaner.thread.timeout.msec"; + @VisibleForTesting + static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; + + public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = + "hbase.oldwals.cleaner.thread.check.interval.msec"; + @VisibleForTesting + static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L; + private final LinkedBlockingQueue pendingDelete; private List oldWALsCleaner; + private long cleanerThreadTimeoutMsec; + private long cleanerThreadCheckIntervalMsec; /** * @param period the period of time to sleep between each run @@ -63,8 +77,12 @@ public class LogCleaner extends CleanerChore { Path oldLogDir) { super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); this.pendingDelete = new LinkedBlockingQueue<>(); - int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); + this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); } @Override @@ -77,7 +95,7 @@ public class LogCleaner extends CleanerChore { public void onConfigurationChange(Configuration conf) { super.onConfigurationChange(conf); - int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); if (newSize == oldWALsCleaner.size()) { if (LOG.isDebugEnabled()) { LOG.debug("Size from configuration is the same as previous which is " + @@ -87,13 +105,18 @@ public class LogCleaner extends CleanerChore { } interruptOldWALsCleaner(); oldWALsCleaner = createOldWalsCleaner(newSize); + cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); } @Override protected int deleteFiles(Iterable filesToDelete) { List results = new LinkedList<>(); for (FileStatus toDelete : filesToDelete) { - CleanerContext context = CleanerContext.createCleanerContext(toDelete); + CleanerContext context = CleanerContext.createCleanerContext(toDelete, + cleanerThreadTimeoutMsec); if (context != null) { pendingDelete.add(context); results.add(context); @@ -102,7 +125,7 @@ public class LogCleaner extends CleanerChore { int deletedFiles = 0; for (CleanerContext res : results) { - deletedFiles += res.getResult(500) ? 1 : 0; + deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0; } return deletedFiles; } @@ -118,6 +141,16 @@ public class LogCleaner extends CleanerChore { return oldWALsCleaner.size(); } + @VisibleForTesting + long getCleanerThreadTimeoutMsec() { + return cleanerThreadTimeoutMsec; + } + + @VisibleForTesting + long getCleanerThreadCheckIntervalMsec() { + return cleanerThreadCheckIntervalMsec; + } + private List createOldWalsCleaner(int size) { LOG.info("Creating OldWALs cleaners with size=" + size); @@ -186,20 +219,20 @@ public class LogCleaner extends CleanerChore { } private static final class CleanerContext { - // At most waits 60 seconds - static final long MAX_WAIT = 60 * 1000; final FileStatus target; volatile boolean result; volatile boolean setFromCleaner = false; + long timeoutMsec; - static CleanerContext createCleanerContext(FileStatus status) { - return status != null ? new CleanerContext(status) : null; + static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { + return status != null ? new CleanerContext(status, timeoutMsec) : null; } - private CleanerContext(FileStatus status) { + private CleanerContext(FileStatus status, long timeoutMsec) { this.target = status; this.result = false; + this.timeoutMsec = timeoutMsec; } synchronized void setResult(boolean res) { @@ -209,13 +242,15 @@ public class LogCleaner extends CleanerChore { } synchronized boolean getResult(long waitIfNotFinished) { - long totalTime = 0; + long totalTimeMsec = 0; try { while (!setFromCleaner) { + long startTimeNanos = System.nanoTime(); wait(waitIfNotFinished); - totalTime += waitIfNotFinished; - if (totalTime >= MAX_WAIT) { - LOG.warn("Spend too much time to delete oldwals " + target); + totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, + TimeUnit.NANOSECONDS); + if (totalTimeMsec >= timeoutMsec) { + LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target); return result; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 465e1932a95..9da4df4aa28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -352,6 +352,8 @@ public class TestHFileCleaner { final int SMALL_FILE_NUM = 20; final int LARGE_THREAD_NUM = 2; final int SMALL_THREAD_NUM = 4; + final long THREAD_TIMEOUT_MSEC = 30 * 1000L; + final long THREAD_CHECK_INTERVAL_MSEC = 500L; Configuration conf = UTIL.getConfiguration(); // no cleaner policies = delete all files @@ -369,6 +371,10 @@ public class TestHFileCleaner { Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint()); Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); + Assert.assertEquals(HFileCleaner.DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC, + cleaner.getCleanerThreadTimeoutMsec()); + Assert.assertEquals(HFileCleaner.DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + cleaner.getCleanerThreadCheckIntervalMsec()); // clean up archive directory and create files for testing fs.delete(archivedHfileDir, true); @@ -396,6 +402,10 @@ public class TestHFileCleaner { newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM); newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM); + newConf.setLong(HFileCleaner.HFILE_DELETE_THREAD_TIMEOUT_MSEC, THREAD_TIMEOUT_MSEC); + newConf.setLong(HFileCleaner.HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + THREAD_CHECK_INTERVAL_MSEC); + LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); cleaner.onConfigurationChange(newConf); @@ -405,6 +415,8 @@ public class TestHFileCleaner { Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size()); + Assert.assertEquals(THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec()); + Assert.assertEquals(THREAD_CHECK_INTERVAL_MSEC, cleaner.getCleanerThreadCheckIntervalMsec()); // make sure no cost when onConfigurationChange called with no change List oldThreads = cleaner.getCleanerThreads(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 9f14e81ab30..9a6ca79cd63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -267,14 +267,23 @@ public class TestLogsCleaner { @Test public void testOnConfigurationChange() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE); + conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); // Prepare environments Server server = new DummyServer(); Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), HConstants.HREGION_OLDLOGDIR_NAME); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); - assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + cleaner.getCleanerThreadTimeoutMsec()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + cleaner.getCleanerThreadCheckIntervalMsec()); // Create dir and files for test fs.delete(oldWALsDir, true); fs.mkdirs(oldWALsDir); @@ -288,9 +297,16 @@ public class TestLogsCleaner { thread.start(); // change size of cleaners dynamically int sizeToChange = 4; - conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange); + long threadTimeoutToChange = 30 * 1000L; + long threadCheckIntervalToChange = 250L; + conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + threadCheckIntervalToChange); cleaner.onConfigurationChange(conf); assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); + assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); + assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec()); // Stop chore thread.join(); status = fs.listStatus(oldWALsDir); diff --git a/src/main/asciidoc/_chapters/configuration.adoc b/src/main/asciidoc/_chapters/configuration.adoc index 174aa80f2b0..113058c0b4f 100644 --- a/src/main/asciidoc/_chapters/configuration.adoc +++ b/src/main/asciidoc/_chapters/configuration.adoc @@ -1071,6 +1071,8 @@ Here are those configurations: | hbase.regionserver.hfilecleaner.small.queue.size | hbase.regionserver.hfilecleaner.large.thread.count | hbase.regionserver.hfilecleaner.small.thread.count +| hbase.regionserver.hfilecleaner.thread.timeout.msec +| hbase.regionserver.hfilecleaner.thread.check.interval.msec | hbase.regionserver.flush.throughput.controller | hbase.hstore.compaction.max.size | hbase.hstore.compaction.max.size.offpeak @@ -1091,6 +1093,8 @@ Here are those configurations: | hbase.offpeak.start.hour | hbase.offpeak.end.hour | hbase.oldwals.cleaner.thread.size +| hbase.oldwals.cleaner.thread.timeout.msec +| hbase.oldwals.cleaner.thread.check.interval.msec | hbase.procedure.worker.keep.alive.time.msec | hbase.procedure.worker.add.stuck.percentage | hbase.procedure.worker.monitor.interval.msec