From 9facfa550f1e7386be3a04d84f7e8013f5002965 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Sat, 1 Apr 2017 10:59:11 +0800 Subject: [PATCH] HBASE-17215 Separate small/large file delete threads in HFileCleaner to accelerate archived hfile cleanup speed --- .../apache/hadoop/hbase/master/HMaster.java | 1 + .../hbase/master/cleaner/CleanerChore.java | 14 +- .../hbase/master/cleaner/HFileCleaner.java | 313 +++++++++++++++++- .../hbase/regionserver/RSRpcServices.java | 3 +- .../master/cleaner/TestHFileCleaner.java | 152 +++++++++ 5 files changed, 478 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index a1cbe53295b..bb9f282acb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -885,6 +885,7 @@ public class HMaster extends HRegionServer implements MasterServices { status.markComplete("Initialization successful"); LOG.info("Master has completed initialization"); configurationManager.registerObserver(this.balancer); + configurationManager.registerObserver(this.hfileCleaner); // Set master as 'initialized'. setInitialized(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index dddad36a96c..825feba2181 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -46,7 +46,7 @@ public abstract class CleanerChore extends Schedu private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName()); - private final FileSystem fs; + protected final FileSystem fs; private final Path oldFileDir; private final Configuration conf; protected List cleanersChain; @@ -269,6 +269,15 @@ public abstract class CleanerChore extends Schedu } Iterable filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); + return deleteFiles(filesToDelete) == files.size(); + } + + /** + * Delete the given files + * @param filesToDelete files to delete + * @return number of deleted files + */ + protected int deleteFiles(Iterable filesToDelete) { int deletedFileCount = 0; for (FileStatus file : filesToDelete) { Path filePath = file.getPath(); @@ -289,8 +298,7 @@ public abstract class CleanerChore extends Schedu LOG.warn("Error while deleting: " + filePath, e); } } - - return deletedFileCount == files.size(); + return deletedFileCount; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 89c316b2e45..3a68252c834 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -17,22 +17,33 @@ */ package org.apache.hadoop.hbase.master.cleaner; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; + +import com.google.common.annotations.VisibleForTesting; /** * This Chore, every time it runs, will clear the HFiles in the hfile archive * folder that are deletable for each HFile cleaner in the chain. */ @InterfaceAudience.Private -public class HFileCleaner extends CleanerChore { +public class HFileCleaner extends CleanerChore implements + ConfigurationObserver { public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins"; @@ -41,6 +52,34 @@ public class HFileCleaner extends CleanerChore { this(period, stopper, conf, fs, directory, null); } + // Configuration key for large/small throttle point + public final static String HFILE_DELETE_THROTTLE_THRESHOLD = + "hbase.regionserver.thread.hfilecleaner.throttle"; + public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M + + // Configuration key for large queue size + public final static String LARGE_HFILE_DELETE_QUEUE_SIZE = + "hbase.regionserver.hfilecleaner.large.queue.size"; + public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576; + + // Configuration key for small queue size + public final static String SMALL_HFILE_DELETE_QUEUE_SIZE = + "hbase.regionserver.hfilecleaner.small.queue.size"; + public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576; + + private static final Log LOG = LogFactory.getLog(HFileCleaner.class); + + BlockingQueue largeFileQueue; + BlockingQueue smallFileQueue; + private int throttlePoint; + private int largeQueueSize; + private int smallQueueSize; + private List threads = new ArrayList(); + private boolean running; + + private long deletedLargeFiles = 0L; + private long deletedSmallFiles = 0L; + /** * @param period the period of time to sleep between each run * @param stopper the stopper @@ -53,6 +92,15 @@ public class HFileCleaner extends CleanerChore { Path directory, Map params) { super("HFileCleaner", period, stopper, conf, fs, directory, MASTER_HFILE_CLEANER_PLUGINS, params); + throttlePoint = + conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); + largeQueueSize = + conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); + smallQueueSize = + conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); + largeFileQueue = new LinkedBlockingQueue(largeQueueSize); + smallFileQueue = new LinkedBlockingQueue(smallQueueSize); + startHFileDeleteThreads(); } @Override @@ -69,4 +117,267 @@ public class HFileCleaner extends CleanerChore { public List getDelegatesForTesting() { return this.cleanersChain; } + + @Override + public int deleteFiles(Iterable filesToDelete) { + int deletedFiles = 0; + List tasks = new ArrayList(); + // construct delete tasks and add into relative queue + for (FileStatus file : filesToDelete) { + HFileDeleteTask task = deleteFile(file); + if (task != null) { + tasks.add(task); + } + } + // wait for each submitted task to finish + for (HFileDeleteTask task : tasks) { + if (task.getResult()) { + deletedFiles++; + } + } + return deletedFiles; + } + + /** + * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue + * @param file the file to delete + * @return HFileDeleteTask to track progress + */ + private HFileDeleteTask deleteFile(FileStatus file) { + HFileDeleteTask task = new HFileDeleteTask(file); + boolean enqueued = dispatch(task); + return enqueued ? task : null; + } + + private boolean dispatch(HFileDeleteTask task) { + if (task.fileLength >= this.throttlePoint) { + if (!this.largeFileQueue.offer(task)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Large file deletion queue is full"); + } + return false; + } + } else { + if (!this.smallFileQueue.offer(task)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Small file deletion queue is full"); + } + return false; + } + } + return true; + } + + @Override + public void cleanup() { + super.cleanup(); + stopHFileDeleteThreads(); + } + + /** + * Start threads for hfile deletion + */ + private void startHFileDeleteThreads() { + final String n = Thread.currentThread().getName(); + running = true; + // start thread for large file deletion + Thread large = new Thread() { + @Override + public void run() { + consumerLoop(largeFileQueue); + } + }; + large.setDaemon(true); + large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis()); + large.start(); + LOG.debug("Starting hfile cleaner for large files: " + large.getName()); + threads.add(large); + + // start thread for small file deletion + Thread small = new Thread() { + @Override + public void run() { + consumerLoop(smallFileQueue); + } + }; + small.setDaemon(true); + small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis()); + small.start(); + LOG.debug("Starting hfile cleaner for small files: " + small.getName()); + threads.add(small); + } + + protected void consumerLoop(BlockingQueue queue) { + try { + while (running) { + HFileDeleteTask task = null; + try { + task = queue.take(); + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while trying to take a task from queue", e); + } + break; + } + if (task != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing: " + task.filePath + " from archive"); + } + boolean succeed; + try { + succeed = this.fs.delete(task.filePath, false); + } catch (IOException e) { + LOG.warn("Failed to delete file " + task.filePath, e); + succeed = false; + } + task.setResult(succeed); + if (succeed) { + countDeletedFiles(queue == largeFileQueue); + } + } + } + } finally { + if (LOG.isDebugEnabled()) { + LOG.debug("Exit thread: " + Thread.currentThread()); + } + } + } + + // Currently only for testing purpose + private void countDeletedFiles(boolean isLarge) { + if (isLarge) { + if (deletedLargeFiles == Long.MAX_VALUE) { + LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); + deletedLargeFiles = 0L; + } + deletedLargeFiles++; + } else { + if (deletedSmallFiles == Long.MAX_VALUE) { + LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); + deletedSmallFiles = 0L; + } + deletedSmallFiles++; + } + } + + /** + * Stop threads for hfile deletion + */ + private void stopHFileDeleteThreads() { + running = false; + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping file delete threads"); + } + for(Thread thread: threads){ + thread.interrupt(); + } + } + + static class HFileDeleteTask { + private static final long MAX_WAIT = 60 * 1000L; + private static final long WAIT_UNIT = 1000L; + + boolean done = false; + boolean result; + final Path filePath; + final long fileLength; + + public HFileDeleteTask(FileStatus file) { + this.filePath = file.getPath(); + this.fileLength = file.getLen(); + } + + public synchronized void setResult(boolean result) { + this.done = true; + this.result = result; + notify(); + } + + public synchronized boolean getResult() { + long waitTime = 0; + try { + while (!done) { + wait(WAIT_UNIT); + waitTime += WAIT_UNIT; + if (done) { + return this.result; + } + if (waitTime > MAX_WAIT) { + LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath + + ", exit..."); + return false; + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for result of deleting " + filePath + + ", will return false", e); + return false; + } + return this.result; + } + } + + @VisibleForTesting + public List getCleanerThreads() { + return threads; + } + + @VisibleForTesting + public long getNumOfDeletedLargeFiles() { + return deletedLargeFiles; + } + + @VisibleForTesting + public long getNumOfDeletedSmallFiles() { + return deletedSmallFiles; + } + + @VisibleForTesting + public long getLargeQueueSize() { + return largeQueueSize; + } + + @VisibleForTesting + public long getSmallQueueSize() { + return smallQueueSize; + } + + @VisibleForTesting + public long getThrottlePoint() { + return throttlePoint; + } + + @Override + public void onConfigurationChange(Configuration conf) { + StringBuilder builder = new StringBuilder(); + builder.append("Updating configuration for HFileCleaner, previous throttle point: ") + .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize) + .append(", smallQueueSize: ").append(smallQueueSize); + stopHFileDeleteThreads(); + this.throttlePoint = + conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); + this.largeQueueSize = + conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); + this.smallQueueSize = + conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); + // record the left over tasks + List leftOverTasks = new ArrayList<>(); + for (HFileDeleteTask task : largeFileQueue) { + leftOverTasks.add(task); + } + for (HFileDeleteTask task : smallFileQueue) { + leftOverTasks.add(task); + } + largeFileQueue = new LinkedBlockingQueue(largeQueueSize); + smallFileQueue = new LinkedBlockingQueue(smallQueueSize); + threads.clear(); + builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ") + .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize); + LOG.debug(builder.toString()); + startHFileDeleteThreads(); + // re-dispatch the left over tasks + for (HFileDeleteTask task : leftOverTasks) { + dispatch(task); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 298f538b25b..8d4ea4d76f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1295,7 +1295,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new RegionServerStoppedException("File system not available"); } if (!regionServer.isOnline()) { - throw new ServerNotRunningYetException("Server is not running yet"); + throw new ServerNotRunningYetException("Server " + regionServer.serverName + + " is not running yet"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 60497016515..8e8a4ddbb22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -22,10 +22,12 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -260,4 +263,153 @@ public class TestHFileCleaner { return null; } } + + @Test + public void testThreadCleanup() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); + Server server = new DummyServer(); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + + // setup the cleaner + FileSystem fs = UTIL.getDFSCluster().getFileSystem(); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + // clean up archive directory + fs.delete(archivedHfileDir, true); + fs.mkdirs(archivedHfileDir); + // create some file to delete + fs.createNewFile(new Path(archivedHfileDir, "dfd-dfd")); + // launch the chore + cleaner.chore(); + // call cleanup + cleaner.cleanup(); + // wait awhile for thread to die + Thread.sleep(100); + for (Thread thread : cleaner.getCleanerThreads()) { + Assert.assertFalse(thread.isAlive()); + } + } + + @Test + public void testLargeSmallIsolation() throws Exception { + Configuration conf = UTIL.getConfiguration(); + // no cleaner policies = delete all files + conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); + conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 512 * 1024); + Server server = new DummyServer(); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + + // setup the cleaner + FileSystem fs = UTIL.getDFSCluster().getFileSystem(); + HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + // clean up archive directory + fs.delete(archivedHfileDir, true); + fs.mkdirs(archivedHfileDir); + // necessary set up + final int LARGE_FILE_NUM = 5; + final int SMALL_FILE_NUM = 20; + createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir); + // call cleanup + cleaner.chore(); + + Assert.assertEquals(LARGE_FILE_NUM, cleaner.getNumOfDeletedLargeFiles()); + Assert.assertEquals(SMALL_FILE_NUM, cleaner.getNumOfDeletedSmallFiles()); + } + + @Test(timeout = 60 * 1000) + public void testOnConfigurationChange() throws Exception { + // constants + final int ORIGINAL_THROTTLE_POINT = 512 * 1024; + final int ORIGINAL_QUEUE_SIZE = 512; + final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check + final int UPDATE_QUEUE_SIZE = 1024; + final int LARGE_FILE_NUM = 5; + final int SMALL_FILE_NUM = 20; + + Configuration conf = UTIL.getConfiguration(); + // no cleaner policies = delete all files + conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, ""); + conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT); + conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE); + conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE); + Server server = new DummyServer(); + Path archivedHfileDir = + new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY); + + // setup the cleaner + FileSystem fs = UTIL.getDFSCluster().getFileSystem(); + final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir); + Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint()); + Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize()); + Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize()); + + // clean up archive directory and create files for testing + fs.delete(archivedHfileDir, true); + fs.mkdirs(archivedHfileDir); + createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir); + + // call cleaner, run as daemon to test the interrupt-at-middle case + Thread t = new Thread() { + @Override + public void run() { + cleaner.chore(); + } + }; + t.setDaemon(true); + t.start(); + // let the cleaner run for some while + Thread.sleep(20); + + // trigger configuration change + Configuration newConf = new Configuration(conf); + newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT); + newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); + newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); + cleaner.onConfigurationChange(newConf); + LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); + + // check values after change + Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint()); + Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize()); + Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize()); + Assert.assertEquals(2, cleaner.getCleanerThreads().size()); + + // wait until clean done and check + t.join(); + LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); + Assert.assertTrue("Should delete more than " + LARGE_FILE_NUM + + " files from large queue but actually " + cleaner.getNumOfDeletedLargeFiles(), + cleaner.getNumOfDeletedLargeFiles() > LARGE_FILE_NUM); + Assert.assertTrue("Should delete less than " + SMALL_FILE_NUM + + " files from small queue but actually " + cleaner.getNumOfDeletedSmallFiles(), + cleaner.getNumOfDeletedSmallFiles() < SMALL_FILE_NUM); + } + + private void createFilesForTesting(int largeFileNum, int smallFileNum, FileSystem fs, + Path archivedHfileDir) throws IOException { + final Random rand = new Random(); + final byte[] large = new byte[1024 * 1024]; + for (int i = 0; i < large.length; i++) { + large[i] = (byte) rand.nextInt(128); + } + final byte[] small = new byte[1024]; + for (int i = 0; i < small.length; i++) { + small[i] = (byte) rand.nextInt(128); + } + // create large and small files + for (int i = 1; i <= largeFileNum; i++) { + FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "large-file-" + i)); + out.write(large); + out.close(); + } + for (int i = 1; i <= smallFileNum; i++) { + FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "small-file-" + i)); + out.write(small); + out.close(); + } + } }