HBASE-20401 Make MAX_WAIT and waitIfNotFinished in CleanerContext configurable (Contributed by Stephen Wu)
This commit is contained in:
parent
46e5baf670
commit
9d481f1faa
|
@ -23,6 +23,7 @@ import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -76,6 +77,16 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
"hbase.regionserver.hfilecleaner.small.thread.count";
|
"hbase.regionserver.hfilecleaner.small.thread.count";
|
||||||
public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
|
public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1;
|
||||||
|
|
||||||
|
public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC =
|
||||||
|
"hbase.regionserver.hfilecleaner.thread.timeout.msec";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L;
|
||||||
|
|
||||||
|
public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC =
|
||||||
|
"hbase.regionserver.hfilecleaner.thread.check.interval.msec";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L;
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
|
private static final Logger LOG = LoggerFactory.getLogger(HFileCleaner.class);
|
||||||
|
|
||||||
StealJobQueue<HFileDeleteTask> largeFileQueue;
|
StealJobQueue<HFileDeleteTask> largeFileQueue;
|
||||||
|
@ -85,6 +96,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
private int smallQueueInitSize;
|
private int smallQueueInitSize;
|
||||||
private int largeFileDeleteThreadNumber;
|
private int largeFileDeleteThreadNumber;
|
||||||
private int smallFileDeleteThreadNumber;
|
private int smallFileDeleteThreadNumber;
|
||||||
|
private long cleanerThreadTimeoutMsec;
|
||||||
|
private long cleanerThreadCheckIntervalMsec;
|
||||||
private List<Thread> threads = new ArrayList<Thread>();
|
private List<Thread> threads = new ArrayList<Thread>();
|
||||||
private boolean running;
|
private boolean running;
|
||||||
|
|
||||||
|
@ -115,6 +128,11 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
|
conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER);
|
||||||
smallFileDeleteThreadNumber =
|
smallFileDeleteThreadNumber =
|
||||||
conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
|
conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER);
|
||||||
|
cleanerThreadTimeoutMsec =
|
||||||
|
conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
|
||||||
|
cleanerThreadCheckIntervalMsec =
|
||||||
|
conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
|
||||||
startHFileDeleteThreads();
|
startHFileDeleteThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,7 +164,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
}
|
}
|
||||||
// wait for each submitted task to finish
|
// wait for each submitted task to finish
|
||||||
for (HFileDeleteTask task : tasks) {
|
for (HFileDeleteTask task : tasks) {
|
||||||
if (task.getResult()) {
|
if (task.getResult(cleanerThreadCheckIntervalMsec)) {
|
||||||
deletedFiles++;
|
deletedFiles++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,7 +177,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
* @return HFileDeleteTask to track progress
|
* @return HFileDeleteTask to track progress
|
||||||
*/
|
*/
|
||||||
private HFileDeleteTask deleteFile(FileStatus file) {
|
private HFileDeleteTask deleteFile(FileStatus file) {
|
||||||
HFileDeleteTask task = new HFileDeleteTask(file);
|
HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec);
|
||||||
boolean enqueued = dispatch(task);
|
boolean enqueued = dispatch(task);
|
||||||
return enqueued ? task : null;
|
return enqueued ? task : null;
|
||||||
}
|
}
|
||||||
|
@ -300,17 +318,17 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
};
|
};
|
||||||
|
|
||||||
private static final class HFileDeleteTask {
|
private static final class HFileDeleteTask {
|
||||||
private static final long MAX_WAIT = 60 * 1000L;
|
|
||||||
private static final long WAIT_UNIT = 1000L;
|
|
||||||
|
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
boolean result;
|
boolean result;
|
||||||
final Path filePath;
|
final Path filePath;
|
||||||
final long fileLength;
|
final long fileLength;
|
||||||
|
final long timeoutMsec;
|
||||||
|
|
||||||
public HFileDeleteTask(FileStatus file) {
|
public HFileDeleteTask(FileStatus file, long timeoutMsec) {
|
||||||
this.filePath = file.getPath();
|
this.filePath = file.getPath();
|
||||||
this.fileLength = file.getLen();
|
this.fileLength = file.getLen();
|
||||||
|
this.timeoutMsec = timeoutMsec;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void setResult(boolean result) {
|
public synchronized void setResult(boolean result) {
|
||||||
|
@ -319,17 +337,19 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
notify();
|
notify();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean getResult() {
|
public synchronized boolean getResult(long waitIfNotFinished) {
|
||||||
long waitTime = 0;
|
long waitTimeMsec = 0;
|
||||||
try {
|
try {
|
||||||
while (!done) {
|
while (!done) {
|
||||||
wait(WAIT_UNIT);
|
long startTimeNanos = System.nanoTime();
|
||||||
waitTime += WAIT_UNIT;
|
wait(waitIfNotFinished);
|
||||||
|
waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
|
||||||
|
TimeUnit.NANOSECONDS);
|
||||||
if (done) {
|
if (done) {
|
||||||
return this.result;
|
return this.result;
|
||||||
}
|
}
|
||||||
if (waitTime > MAX_WAIT) {
|
if (waitTimeMsec > timeoutMsec) {
|
||||||
LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath
|
LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath
|
||||||
+ ", exit...");
|
+ ", exit...");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -373,6 +393,16 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
return throttlePoint;
|
return throttlePoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getCleanerThreadTimeoutMsec() {
|
||||||
|
return cleanerThreadTimeoutMsec;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getCleanerThreadCheckIntervalMsec() {
|
||||||
|
return cleanerThreadCheckIntervalMsec;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration conf) {
|
public void onConfigurationChange(Configuration conf) {
|
||||||
super.onConfigurationChange(conf);
|
super.onConfigurationChange(conf);
|
||||||
|
@ -443,6 +473,19 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
||||||
this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
|
this.smallFileDeleteThreadNumber = smallFileDeleteThreadNumber;
|
||||||
updated = true;
|
updated = true;
|
||||||
}
|
}
|
||||||
|
long cleanerThreadTimeoutMsec =
|
||||||
|
conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC);
|
||||||
|
if (cleanerThreadTimeoutMsec != this.cleanerThreadTimeoutMsec) {
|
||||||
|
this.cleanerThreadTimeoutMsec = cleanerThreadTimeoutMsec;
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
|
long cleanerThreadCheckIntervalMsec =
|
||||||
|
conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC);
|
||||||
|
if (cleanerThreadCheckIntervalMsec != this.cleanerThreadCheckIntervalMsec) {
|
||||||
|
this.cleanerThreadCheckIntervalMsec = cleanerThreadCheckIntervalMsec;
|
||||||
|
updated = true;
|
||||||
|
}
|
||||||
return updated;
|
return updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.ArrayList;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -46,11 +47,24 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName());
|
private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName());
|
||||||
|
|
||||||
public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size";
|
public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
|
||||||
public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2;
|
public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
|
||||||
|
|
||||||
|
public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC =
|
||||||
|
"hbase.oldwals.cleaner.thread.timeout.msec";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
|
||||||
|
|
||||||
|
public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC =
|
||||||
|
"hbase.oldwals.cleaner.thread.check.interval.msec";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L;
|
||||||
|
|
||||||
|
|
||||||
private final LinkedBlockingQueue<CleanerContext> pendingDelete;
|
private final LinkedBlockingQueue<CleanerContext> pendingDelete;
|
||||||
private List<Thread> oldWALsCleaner;
|
private List<Thread> oldWALsCleaner;
|
||||||
|
private long cleanerThreadTimeoutMsec;
|
||||||
|
private long cleanerThreadCheckIntervalMsec;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param period the period of time to sleep between each run
|
* @param period the period of time to sleep between each run
|
||||||
|
@ -63,8 +77,12 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
Path oldLogDir) {
|
Path oldLogDir) {
|
||||||
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
|
super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
|
||||||
this.pendingDelete = new LinkedBlockingQueue<>();
|
this.pendingDelete = new LinkedBlockingQueue<>();
|
||||||
int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE);
|
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
|
||||||
this.oldWALsCleaner = createOldWalsCleaner(size);
|
this.oldWALsCleaner = createOldWalsCleaner(size);
|
||||||
|
this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
|
||||||
|
DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
|
||||||
|
this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -77,7 +95,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
public void onConfigurationChange(Configuration conf) {
|
public void onConfigurationChange(Configuration conf) {
|
||||||
super.onConfigurationChange(conf);
|
super.onConfigurationChange(conf);
|
||||||
|
|
||||||
int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE);
|
int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
|
||||||
if (newSize == oldWALsCleaner.size()) {
|
if (newSize == oldWALsCleaner.size()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Size from configuration is the same as previous which is " +
|
LOG.debug("Size from configuration is the same as previous which is " +
|
||||||
|
@ -87,13 +105,18 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
}
|
}
|
||||||
interruptOldWALsCleaner();
|
interruptOldWALsCleaner();
|
||||||
oldWALsCleaner = createOldWalsCleaner(newSize);
|
oldWALsCleaner = createOldWalsCleaner(newSize);
|
||||||
|
cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
|
||||||
|
DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
|
||||||
|
cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
|
protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
|
||||||
List<CleanerContext> results = new LinkedList<>();
|
List<CleanerContext> results = new LinkedList<>();
|
||||||
for (FileStatus toDelete : filesToDelete) {
|
for (FileStatus toDelete : filesToDelete) {
|
||||||
CleanerContext context = CleanerContext.createCleanerContext(toDelete);
|
CleanerContext context = CleanerContext.createCleanerContext(toDelete,
|
||||||
|
cleanerThreadTimeoutMsec);
|
||||||
if (context != null) {
|
if (context != null) {
|
||||||
pendingDelete.add(context);
|
pendingDelete.add(context);
|
||||||
results.add(context);
|
results.add(context);
|
||||||
|
@ -102,7 +125,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
|
|
||||||
int deletedFiles = 0;
|
int deletedFiles = 0;
|
||||||
for (CleanerContext res : results) {
|
for (CleanerContext res : results) {
|
||||||
deletedFiles += res.getResult(500) ? 1 : 0;
|
deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0;
|
||||||
}
|
}
|
||||||
return deletedFiles;
|
return deletedFiles;
|
||||||
}
|
}
|
||||||
|
@ -118,6 +141,16 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
return oldWALsCleaner.size();
|
return oldWALsCleaner.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getCleanerThreadTimeoutMsec() {
|
||||||
|
return cleanerThreadTimeoutMsec;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getCleanerThreadCheckIntervalMsec() {
|
||||||
|
return cleanerThreadCheckIntervalMsec;
|
||||||
|
}
|
||||||
|
|
||||||
private List<Thread> createOldWalsCleaner(int size) {
|
private List<Thread> createOldWalsCleaner(int size) {
|
||||||
LOG.info("Creating OldWALs cleaners with size=" + size);
|
LOG.info("Creating OldWALs cleaners with size=" + size);
|
||||||
|
|
||||||
|
@ -186,20 +219,20 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class CleanerContext {
|
private static final class CleanerContext {
|
||||||
// At most waits 60 seconds
|
|
||||||
static final long MAX_WAIT = 60 * 1000;
|
|
||||||
|
|
||||||
final FileStatus target;
|
final FileStatus target;
|
||||||
volatile boolean result;
|
volatile boolean result;
|
||||||
volatile boolean setFromCleaner = false;
|
volatile boolean setFromCleaner = false;
|
||||||
|
long timeoutMsec;
|
||||||
|
|
||||||
static CleanerContext createCleanerContext(FileStatus status) {
|
static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) {
|
||||||
return status != null ? new CleanerContext(status) : null;
|
return status != null ? new CleanerContext(status, timeoutMsec) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private CleanerContext(FileStatus status) {
|
private CleanerContext(FileStatus status, long timeoutMsec) {
|
||||||
this.target = status;
|
this.target = status;
|
||||||
this.result = false;
|
this.result = false;
|
||||||
|
this.timeoutMsec = timeoutMsec;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void setResult(boolean res) {
|
synchronized void setResult(boolean res) {
|
||||||
|
@ -209,13 +242,15 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized boolean getResult(long waitIfNotFinished) {
|
synchronized boolean getResult(long waitIfNotFinished) {
|
||||||
long totalTime = 0;
|
long totalTimeMsec = 0;
|
||||||
try {
|
try {
|
||||||
while (!setFromCleaner) {
|
while (!setFromCleaner) {
|
||||||
|
long startTimeNanos = System.nanoTime();
|
||||||
wait(waitIfNotFinished);
|
wait(waitIfNotFinished);
|
||||||
totalTime += waitIfNotFinished;
|
totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
|
||||||
if (totalTime >= MAX_WAIT) {
|
TimeUnit.NANOSECONDS);
|
||||||
LOG.warn("Spend too much time to delete oldwals " + target);
|
if (totalTimeMsec >= timeoutMsec) {
|
||||||
|
LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -352,6 +352,8 @@ public class TestHFileCleaner {
|
||||||
final int SMALL_FILE_NUM = 20;
|
final int SMALL_FILE_NUM = 20;
|
||||||
final int LARGE_THREAD_NUM = 2;
|
final int LARGE_THREAD_NUM = 2;
|
||||||
final int SMALL_THREAD_NUM = 4;
|
final int SMALL_THREAD_NUM = 4;
|
||||||
|
final long THREAD_TIMEOUT_MSEC = 30 * 1000L;
|
||||||
|
final long THREAD_CHECK_INTERVAL_MSEC = 500L;
|
||||||
|
|
||||||
Configuration conf = UTIL.getConfiguration();
|
Configuration conf = UTIL.getConfiguration();
|
||||||
// no cleaner policies = delete all files
|
// no cleaner policies = delete all files
|
||||||
|
@ -369,6 +371,10 @@ public class TestHFileCleaner {
|
||||||
Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
|
Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
|
||||||
Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
|
Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
|
||||||
Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
|
Assert.assertEquals(ORIGINAL_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
|
||||||
|
Assert.assertEquals(HFileCleaner.DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC,
|
||||||
|
cleaner.getCleanerThreadTimeoutMsec());
|
||||||
|
Assert.assertEquals(HFileCleaner.DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
cleaner.getCleanerThreadCheckIntervalMsec());
|
||||||
|
|
||||||
// clean up archive directory and create files for testing
|
// clean up archive directory and create files for testing
|
||||||
fs.delete(archivedHfileDir, true);
|
fs.delete(archivedHfileDir, true);
|
||||||
|
@ -396,6 +402,10 @@ public class TestHFileCleaner {
|
||||||
newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
|
newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE);
|
||||||
newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM);
|
newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM);
|
||||||
newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM);
|
newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM);
|
||||||
|
newConf.setLong(HFileCleaner.HFILE_DELETE_THREAD_TIMEOUT_MSEC, THREAD_TIMEOUT_MSEC);
|
||||||
|
newConf.setLong(HFileCleaner.HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
THREAD_CHECK_INTERVAL_MSEC);
|
||||||
|
|
||||||
LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
|
LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
|
||||||
+ "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
|
+ "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
|
||||||
cleaner.onConfigurationChange(newConf);
|
cleaner.onConfigurationChange(newConf);
|
||||||
|
@ -405,6 +415,8 @@ public class TestHFileCleaner {
|
||||||
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
|
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize());
|
||||||
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
|
Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize());
|
||||||
Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size());
|
Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size());
|
||||||
|
Assert.assertEquals(THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec());
|
||||||
|
Assert.assertEquals(THREAD_CHECK_INTERVAL_MSEC, cleaner.getCleanerThreadCheckIntervalMsec());
|
||||||
|
|
||||||
// make sure no cost when onConfigurationChange called with no change
|
// make sure no cost when onConfigurationChange called with no change
|
||||||
List<Thread> oldThreads = cleaner.getCleanerThreads();
|
List<Thread> oldThreads = cleaner.getCleanerThreads();
|
||||||
|
|
|
@ -267,14 +267,23 @@ public class TestLogsCleaner {
|
||||||
@Test
|
@Test
|
||||||
public void testOnConfigurationChange() throws Exception {
|
public void testOnConfigurationChange() throws Exception {
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE);
|
conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE,
|
||||||
|
LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
|
||||||
|
conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
|
||||||
|
LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
|
||||||
|
conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
|
||||||
// Prepare environments
|
// Prepare environments
|
||||||
Server server = new DummyServer();
|
Server server = new DummyServer();
|
||||||
Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
|
Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
|
||||||
HConstants.HREGION_OLDLOGDIR_NAME);
|
HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir);
|
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir);
|
||||||
assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners());
|
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners());
|
||||||
|
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
|
||||||
|
cleaner.getCleanerThreadTimeoutMsec());
|
||||||
|
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
cleaner.getCleanerThreadCheckIntervalMsec());
|
||||||
// Create dir and files for test
|
// Create dir and files for test
|
||||||
fs.delete(oldWALsDir, true);
|
fs.delete(oldWALsDir, true);
|
||||||
fs.mkdirs(oldWALsDir);
|
fs.mkdirs(oldWALsDir);
|
||||||
|
@ -288,9 +297,16 @@ public class TestLogsCleaner {
|
||||||
thread.start();
|
thread.start();
|
||||||
// change size of cleaners dynamically
|
// change size of cleaners dynamically
|
||||||
int sizeToChange = 4;
|
int sizeToChange = 4;
|
||||||
conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange);
|
long threadTimeoutToChange = 30 * 1000L;
|
||||||
|
long threadCheckIntervalToChange = 250L;
|
||||||
|
conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange);
|
||||||
|
conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
|
||||||
|
conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
|
||||||
|
threadCheckIntervalToChange);
|
||||||
cleaner.onConfigurationChange(conf);
|
cleaner.onConfigurationChange(conf);
|
||||||
assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
|
assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
|
||||||
|
assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
|
||||||
|
assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec());
|
||||||
// Stop chore
|
// Stop chore
|
||||||
thread.join();
|
thread.join();
|
||||||
status = fs.listStatus(oldWALsDir);
|
status = fs.listStatus(oldWALsDir);
|
||||||
|
|
|
@ -1071,6 +1071,8 @@ Here are those configurations:
|
||||||
| hbase.regionserver.hfilecleaner.small.queue.size
|
| hbase.regionserver.hfilecleaner.small.queue.size
|
||||||
| hbase.regionserver.hfilecleaner.large.thread.count
|
| hbase.regionserver.hfilecleaner.large.thread.count
|
||||||
| hbase.regionserver.hfilecleaner.small.thread.count
|
| hbase.regionserver.hfilecleaner.small.thread.count
|
||||||
|
| hbase.regionserver.hfilecleaner.thread.timeout.msec
|
||||||
|
| hbase.regionserver.hfilecleaner.thread.check.interval.msec
|
||||||
| hbase.regionserver.flush.throughput.controller
|
| hbase.regionserver.flush.throughput.controller
|
||||||
| hbase.hstore.compaction.max.size
|
| hbase.hstore.compaction.max.size
|
||||||
| hbase.hstore.compaction.max.size.offpeak
|
| hbase.hstore.compaction.max.size.offpeak
|
||||||
|
@ -1091,6 +1093,8 @@ Here are those configurations:
|
||||||
| hbase.offpeak.start.hour
|
| hbase.offpeak.start.hour
|
||||||
| hbase.offpeak.end.hour
|
| hbase.offpeak.end.hour
|
||||||
| hbase.oldwals.cleaner.thread.size
|
| hbase.oldwals.cleaner.thread.size
|
||||||
|
| hbase.oldwals.cleaner.thread.timeout.msec
|
||||||
|
| hbase.oldwals.cleaner.thread.check.interval.msec
|
||||||
| hbase.procedure.worker.keep.alive.time.msec
|
| hbase.procedure.worker.keep.alive.time.msec
|
||||||
| hbase.procedure.worker.add.stuck.percentage
|
| hbase.procedure.worker.add.stuck.percentage
|
||||||
| hbase.procedure.worker.monitor.interval.msec
|
| hbase.procedure.worker.monitor.interval.msec
|
||||||
|
|
Loading…
Reference in New Issue