HBASE-17854 Use StealJobQueue in HFileCleaner after HBASE-17215

This commit is contained in:
Yu Li 2017-04-05 17:53:21 +08:00
parent a66d491892
commit cbcbcf4dcd
4 changed files with 102 additions and 48 deletions

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.StealJobQueue;
import com.google.common.annotations.VisibleForTesting;
/**
@ -57,23 +57,23 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
"hbase.regionserver.thread.hfilecleaner.throttle";
public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
// Configuration key for large queue size
public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
// Configuration key for large queue initial size
public final static String LARGE_HFILE_QUEUE_INIT_SIZE =
"hbase.regionserver.hfilecleaner.large.queue.size";
public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576;
public final static int DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE = 10240;
// Configuration key for small queue size
public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
// Configuration key for small queue initial size
public final static String SMALL_HFILE_QUEUE_INIT_SIZE =
"hbase.regionserver.hfilecleaner.small.queue.size";
public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576;
public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240;
private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
BlockingQueue<HFileDeleteTask> largeFileQueue;
StealJobQueue<HFileDeleteTask> largeFileQueue;
BlockingQueue<HFileDeleteTask> smallFileQueue;
private int throttlePoint;
private int largeQueueSize;
private int smallQueueSize;
private int largeQueueInitSize;
private int smallQueueInitSize;
private List<Thread> threads = new ArrayList<Thread>();
private boolean running;
@ -94,12 +94,12 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
directory, MASTER_HFILE_CLEANER_PLUGINS, params);
throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
largeQueueSize =
conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
smallQueueSize =
conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
largeQueueInitSize =
conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
smallQueueInitSize =
conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
smallFileQueue = largeFileQueue.getStealFromQueue();
startHFileDeleteThreads();
}
@ -152,6 +152,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
private boolean dispatch(HFileDeleteTask task) {
if (task.fileLength >= this.throttlePoint) {
if (!this.largeFileQueue.offer(task)) {
// should never arrive here as long as we use PriorityQueue
if (LOG.isTraceEnabled()) {
LOG.trace("Large file deletion queue is full");
}
@ -159,6 +160,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
} else {
if (!this.smallFileQueue.offer(task)) {
// should never arrive here as long as we use PriorityQueue
if (LOG.isTraceEnabled()) {
LOG.trace("Small file deletion queue is full");
}
@ -232,7 +234,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
task.setResult(succeed);
if (succeed) {
countDeletedFiles(queue == largeFileQueue);
countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue);
}
}
}
@ -244,8 +246,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
// Currently only for testing purpose
private void countDeletedFiles(boolean isLarge) {
if (isLarge) {
private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) {
if (isLargeFile) {
if (deletedLargeFiles == Long.MAX_VALUE) {
LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
deletedLargeFiles = 0L;
@ -256,6 +258,9 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
deletedSmallFiles = 0L;
}
if (fromLargeQueue && LOG.isTraceEnabled()) {
LOG.trace("Stolen a small file deletion task in large file thread");
}
deletedSmallFiles++;
}
}
@ -273,7 +278,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
}
static class HFileDeleteTask {
static class HFileDeleteTask implements Comparable<HFileDeleteTask> {
private static final long MAX_WAIT = 60 * 1000L;
private static final long WAIT_UNIT = 1000L;
@ -315,6 +320,31 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
return this.result;
}
@Override
public int compareTo(HFileDeleteTask o) {
long sub = this.fileLength - o.fileLength;
// smaller value with higher priority in PriorityQueue, and we intent to delete the larger
// file first.
return (sub > 0) ? -1 : (sub < 0 ? 1 : 0);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || !(o instanceof HFileDeleteTask)) {
return false;
}
HFileDeleteTask otherTask = (HFileDeleteTask) o;
return this.filePath.equals(otherTask.filePath) && (this.fileLength == otherTask.fileLength);
}
@Override
public int hashCode() {
return filePath.hashCode();
}
}
@VisibleForTesting
@ -333,13 +363,13 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
}
@VisibleForTesting
public long getLargeQueueSize() {
return largeQueueSize;
public long getLargeQueueInitSize() {
return largeQueueInitSize;
}
@VisibleForTesting
public long getSmallQueueSize() {
return smallQueueSize;
public long getSmallQueueInitSize() {
return smallQueueInitSize;
}
@VisibleForTesting
@ -351,15 +381,15 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
public void onConfigurationChange(Configuration conf) {
StringBuilder builder = new StringBuilder();
builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
.append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize)
.append(", smallQueueSize: ").append(smallQueueSize);
.append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize)
.append(", smallQueueInitSize: ").append(smallQueueInitSize);
stopHFileDeleteThreads();
this.throttlePoint =
conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
this.largeQueueSize =
conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
this.smallQueueSize =
conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
this.largeQueueInitSize =
conf.getInt(LARGE_HFILE_QUEUE_INIT_SIZE, DEFAULT_LARGE_HFILE_QUEUE_INIT_SIZE);
this.smallQueueInitSize =
conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE);
// record the left over tasks
List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
for (HFileDeleteTask task : largeFileQueue) {
@ -368,11 +398,11 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> impleme
for (HFileDeleteTask task : smallFileQueue) {
leftOverTasks.add(task);
}
largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize);
smallFileQueue = largeFileQueue.getStealFromQueue();
threads.clear();
builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize: ")
.append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize);
builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ")
.append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize);
LOG.debug(builder.toString());
startHFileDeleteThreads();
// re-dispatch the left over tasks

View File

@ -48,6 +48,7 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
public StealJobQueue() {
this.stealFromQueue = new PriorityBlockingQueue<T>() {
@Override
public boolean offer(T t) {
lock.lock();
@ -61,6 +62,27 @@ public class StealJobQueue<T> extends PriorityBlockingQueue<T> {
};
}
public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
super(initCapacity);
this.stealFromQueue = new PriorityBlockingQueue<T>(stealFromQueueInitCapacity) {
@Override
public boolean offer(T t) {
lock.lock();
try {
notEmpty.signal();
return super.offer(t);
} finally {
lock.unlock();
}
}
};
}
/**
* Get a queue whose job might be stolen by the consumer of this original queue
* @return the queue whose job could be stolen
*/
public BlockingQueue<T> getStealFromQueue() {
return stealFromQueue;
}

View File

@ -322,9 +322,9 @@ public class TestHFileCleaner {
public void testOnConfigurationChange() throws Exception {
// constants
final int ORIGINAL_THROTTLE_POINT = 512 * 1024;
final int ORIGINAL_QUEUE_SIZE = 512;
final int ORIGINAL_QUEUE_INIT_SIZE = 512;
final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check
final int UPDATE_QUEUE_SIZE = 1024;
final int UPDATE_QUEUE_INIT_SIZE = 1024;
final int LARGE_FILE_NUM = 5;
final int SMALL_FILE_NUM = 20;
@ -332,8 +332,8 @@ public class TestHFileCleaner {
// no cleaner policies = delete all files
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT);
conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
conf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
conf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, ORIGINAL_QUEUE_INIT_SIZE);
Server server = new DummyServer();
Path archivedHfileDir =
new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
@ -342,8 +342,8 @@ public class TestHFileCleaner {
FileSystem fs = UTIL.getDFSCluster().getFileSystem();
final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize());
Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize());
Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
// clean up archive directory and create files for testing
fs.delete(archivedHfileDir, true);
@ -359,22 +359,24 @@ public class TestHFileCleaner {
};
t.setDaemon(true);
t.start();
// let the cleaner run for some while
Thread.sleep(20);
// wait until file clean started
while (cleaner.getNumOfDeletedSmallFiles() == 0) {
Thread.yield();
}
// trigger configuration change
Configuration newConf = new Configuration(conf);
newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
cleaner.onConfigurationChange(newConf);
newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+ "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
cleaner.onConfigurationChange(newConf);
// check values after change
Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize());
Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize());
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
Assert.assertEquals(2, cleaner.getCleanerThreads().size());
// wait until clean done and check

View File

@ -38,7 +38,7 @@ import static org.junit.Assert.*;
public class TestStealJobQueue {
StealJobQueue<Integer> stealJobQueue;
BlockingQueue stealFromQueue;
BlockingQueue<Integer> stealFromQueue;
@Before
public void setup() {