From cbcbcf4dcd3401327cc36173f3ca8e5362da1e0c Mon Sep 17 00:00:00 2001 From: Yu Li Date: Wed, 5 Apr 2017 17:53:21 +0800 Subject: [PATCH] HBASE-17854 Use StealJobQueue in HFileCleaner after HBASE-17215 --- .../hbase/master/cleaner/HFileCleaner.java | 98 ++++++++++++------- .../hadoop/hbase/util/StealJobQueue.java | 22 +++++ .../master/cleaner/TestHFileCleaner.java | 28 +++--- .../hadoop/hbase/util/TestStealJobQueue.java | 2 +- 4 files changed, 102 insertions(+), 48 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 3a68252c834..8b3515a791d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.StealJobQueue; import com.google.common.annotations.VisibleForTesting; /** @@ -57,23 +57,23 @@ public class HFileCleaner extends CleanerChore impleme "hbase.regionserver.thread.hfilecleaner.throttle"; public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M - // Configuration key for large queue size - public final static String LARGE_HFILE_DELETE_QUEUE_SIZE = + // Configuration key for large queue initial size + public final static String LARGE_HFILE_QUEUE_INIT_SIZE = "hbase.regionserver.hfilecleaner.large.queue.size"; - public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576; + public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240; - // Configuration key for small queue size - public final static String SMALL_HFILE_DELETE_QUEUE_SIZE = + // Configuration key for small queue initial size + public final static String SMALL_HFILE_QUEUE_INIT_SIZE = "hbase.regionserver.hfilecleaner.small.queue.size"; - public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576; + public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; private static final Log LOG = LogFactory.getLog(HFileCleaner.class); - BlockingQueue largeFileQueue; + StealJobQueue largeFileQueue; BlockingQueue smallFileQueue; private int throttlePoint; - private int largeQueueSize; - private int smallQueueSize; + private int largeQueueInitSize; + private int smallQueueInitSize; private List threads = new ArrayList(); private boolean running; @@ -94,12 +94,12 @@ public class HFileCleaner extends CleanerChore impleme directory, MASTER_HFILE_CLEANER_PLUGINS, params); throttlePoint = conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); - largeQueueSize = - conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); - smallQueueSize = - conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); - largeFileQueue = new LinkedBlockingQueue(largeQueueSize); - smallFileQueue = new LinkedBlockingQueue(smallQueueSize); + largeQueueInitSize = + conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); + smallQueueInitSize = + conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); + largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); + smallFileQueue = largeFileQueue.getStealFromQueue(); startHFileDeleteThreads(); } @@ -152,6 +152,7 @@ public class HFileCleaner extends CleanerChore impleme private boolean dispatch(HFileDeleteTask task) { if (task.fileLength >= this.throttlePoint) { if (!this.largeFileQueue.offer(task)) { + // should never arrive here as long as we use PriorityQueue if (LOG.isTraceEnabled()) { LOG.trace("Large file deletion queue is full"); } @@ -159,6 +160,7 @@ public class HFileCleaner extends CleanerChore impleme } } else { if (!this.smallFileQueue.offer(task)) { + // should never arrive here as long as we use PriorityQueue if (LOG.isTraceEnabled()) { LOG.trace("Small file deletion queue is full"); } @@ -232,7 +234,7 @@ public class HFileCleaner extends CleanerChore impleme } task.setResult(succeed); if (succeed) { - countDeletedFiles(queue == largeFileQueue); + countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue); } } } @@ -244,8 +246,8 @@ public class HFileCleaner extends CleanerChore impleme } // Currently only for testing purpose - private void countDeletedFiles(boolean isLarge) { - if (isLarge) { + private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { + if (isLargeFile) { if (deletedLargeFiles == Long.MAX_VALUE) { LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); deletedLargeFiles = 0L; @@ -256,6 +258,9 @@ public class HFileCleaner extends CleanerChore impleme LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); deletedSmallFiles = 0L; } + if (fromLargeQueue && LOG.isTraceEnabled()) { + LOG.trace("Stolen a small file deletion task in large file thread"); + } deletedSmallFiles++; } } @@ -273,7 +278,7 @@ public class HFileCleaner extends CleanerChore impleme } } - static class HFileDeleteTask { + static class HFileDeleteTask implements Comparable { private static final long MAX_WAIT = 60 * 1000L; private static final long WAIT_UNIT = 1000L; @@ -315,6 +320,31 @@ public class HFileCleaner extends CleanerChore impleme } return this.result; } + + @Override + public int compareTo(HFileDeleteTask o) { + long sub = this.fileLength - o.fileLength; + // smaller value with higher priority in PriorityQueue, and we intent to delete the larger + // file first. + return (sub > 0) ? -1 : (sub < 0 ? 1 : 0); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !(o instanceof HFileDeleteTask)) { + return false; + } + HFileDeleteTask otherTask = (HFileDeleteTask) o; + return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength); + } + + @Override + public int hashCode() { + return filePath.hashCode(); + } } @VisibleForTesting @@ -333,13 +363,13 @@ public class HFileCleaner extends CleanerChore impleme } @VisibleForTesting - public long getLargeQueueSize() { - return largeQueueSize; + public long getLargeQueueInitSize() { + return largeQueueInitSize; } @VisibleForTesting - public long getSmallQueueSize() { - return smallQueueSize; + public long getSmallQueueInitSize() { + return smallQueueInitSize; } @VisibleForTesting @@ -351,15 +381,15 @@ public class HFileCleaner extends CleanerChore impleme public void onConfigurationChange(Configuration conf) { StringBuilder builder = new StringBuilder(); builder.append("Updating configuration for HFileCleaner, previous throttle point: ") - .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize) - .append(", smallQueueSize: ").append(smallQueueSize); + .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize) + .append(", smallQueueInitSize: ").append(smallQueueInitSize); stopHFileDeleteThreads(); this.throttlePoint = conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); - this.largeQueueSize = - conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); - this.smallQueueSize = - conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); + this.largeQueueInitSize = + conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE); + this.smallQueueInitSize = + conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); // record the left over tasks List leftOverTasks = new ArrayList<>(); for (HFileDeleteTask task : largeFileQueue) { @@ -368,11 +398,11 @@ public class HFileCleaner extends CleanerChore impleme for (HFileDeleteTask task : smallFileQueue) { leftOverTasks.add(task); } - largeFileQueue = new LinkedBlockingQueue(largeQueueSize); - smallFileQueue = new LinkedBlockingQueue(smallQueueSize); + largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); + smallFileQueue = largeFileQueue.getStealFromQueue(); threads.clear(); - builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ") - .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize); + builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ") + .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize); LOG.debug(builder.toString()); startHFileDeleteThreads(); // re-dispatch the left over tasks diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java index 74f07477b90..5e7e232d604 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java @@ -48,6 +48,7 @@ public class StealJobQueue extends PriorityBlockingQueue { public StealJobQueue() { this.stealFromQueue = new PriorityBlockingQueue() { + @Override public boolean offer(T t) { lock.lock(); @@ -61,6 +62,27 @@ public class StealJobQueue extends PriorityBlockingQueue { }; } + public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) { + super(initCapacity); + this.stealFromQueue = new PriorityBlockingQueue(stealFromQueueInitCapacity) { + + @Override + public boolean offer(T t) { + lock.lock(); + try { + notEmpty.signal(); + return super.offer(t); + } finally { + lock.unlock(); + } + } + }; + } + + /** + * Get a queue whose job might be stolen by the consumer of this original queue + * @return the queue whose job could be stolen + */ public BlockingQueue getStealFromQueue() { return stealFromQueue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 8e8a4ddbb22..5f054884d57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -322,9 +322,9 @@ public class TestHFileCleaner { public void testOnConfigurationChange() throws Exception { // constants final int ORIGINAL_THROTTLE_POINT = 512 * 1024; - final int ORIGINAL_QUEUE_SIZE = 512; + final int ORIGINAL_QUEUE_INIT_SIZE = 512; final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check - final int UPDATE_QUEUE_SIZE = 1024; + final int UPDATE_QUEUE_INIT_SIZE = 1024; final int LARGE_FILE_NUM = 5; final int SMALL_FILE_NUM = 20; @@ -332,8 +332,8 @@ public class TestHFileCleaner { // no cleaner policies = delete all files conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT); - conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE); - conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE); + conf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE); + conf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE); Server server = new DummyServer(); Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); @@ -342,8 +342,8 @@ public class TestHFileCleaner { FileSystem fs = UTIL.getDFSCluster().getFileSystem(); final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint()); - Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize()); - Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize()); + Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); + Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); // clean up archive directory and create files for testing fs.delete(archivedHfileDir, true); @@ -359,22 +359,24 @@ public class TestHFileCleaner { }; t.setDaemon(true); t.start(); - // let the cleaner run for some while - Thread.sleep(20); + // wait until file clean started + while (cleaner.getNumOfDeletedSmallFiles() == 0) { + Thread.yield(); + } // trigger configuration change Configuration newConf = new Configuration(conf); newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT); - newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); - newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); - cleaner.onConfigurationChange(newConf); + newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); + newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); + cleaner.onConfigurationChange(newConf); // check values after change Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint()); - Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize()); - Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize()); + Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); + Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); Assert.assertEquals(2, cleaner.getCleanerThreads().size()); // wait until clean done and check diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java index b35f6f4772b..54fdacab0f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestStealJobQueue.java @@ -38,7 +38,7 @@ import static org.junit.Assert.*; public class TestStealJobQueue { StealJobQueue stealJobQueue; - BlockingQueue stealFromQueue; + BlockingQueue stealFromQueue; @Before public void setup() {