HBASE-20095 Redesign single instance pool in CleanerChore - addendum simplifies onConfigurationChange

This commit is contained in:
tedyu 2018-04-05 10:25:09 -07:00
parent 039bc73571
commit e78a8e08f0
1 changed files with 9 additions and 16 deletions

View File

@ -89,38 +89,34 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
} }
/** /**
* Checks if pool can be updated immediately. * Checks if pool can be updated. If so, mark for update later.
* @param conf configuration * @param conf configuration
* @return true if pool can be updated immediately, false otherwise
*/ */
synchronized boolean canUpdateImmediately(Configuration conf) { synchronized void markUpdate(Configuration conf) {
int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
if (newSize == size) { if (newSize == size) {
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize); LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
return false; return;
} }
size = newSize; size = newSize;
if (pool.getPoolSize() == 0) {
// chore has no working thread.
return true;
}
// Chore is working, update it later. // Chore is working, update it later.
reconfigNotification.set(true); reconfigNotification.set(true);
return false;
} }
/** /**
* Update pool with new size. * Update pool with new size.
*/ */
synchronized void updatePool(long timeout) { synchronized void updatePool(long timeout) {
while (cleanerLatch != 0) { long stopTime = System.currentTimeMillis() + timeout;
while (cleanerLatch != 0 && timeout > 0) {
try { try {
wait(timeout); wait(timeout);
timeout = stopTime - System.currentTimeMillis();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// It's ok to ignore Thread.currentThread().interrupt();
}
break; break;
} }
}
pool.shutdownNow(); pool.shutdownNow();
LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size); LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), size);
pool = new ForkJoinPool(size); pool = new ForkJoinPool(size);
@ -243,10 +239,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
@Override @Override
public void onConfigurationChange(Configuration conf) { public void onConfigurationChange(Configuration conf) {
if (POOL.canUpdateImmediately(conf)) { POOL.markUpdate(conf);
// Can immediately update, no need to wait.
POOL.updatePool(0);
}
} }
/** /**