HBASE-19069 Do not wrap the original CompactionLifeCycleTracker when calling CP hooks

This commit is contained in:
zhangduo 2017-10-23 21:10:44 +08:00
parent 81133f89fc
commit 37b29e909d
2 changed files with 80 additions and 64 deletions

View File

@ -237,80 +237,73 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
}
// A compaction life cycle tracker to trace the execution of all the compactions triggered by one
// request and delegate to the source CompactionLifeCycleTracker. It will call completed method if
// all the compactions are finished.
private static final class AggregatingCompactionLifeCycleTracker
implements CompactionLifeCycleTracker {
private interface CompactionCompleteTracker {
default void completed(Store store) {
}
}
private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
new CompactionCompleteTracker() {
};
private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
private final CompactionLifeCycleTracker tracker;
private final AtomicInteger remaining;
public AggregatingCompactionLifeCycleTracker(CompactionLifeCycleTracker tracker,
int numberOfStores) {
public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
this.tracker = tracker;
this.remaining = new AtomicInteger(numberOfStores);
}
private void tryCompleted() {
@Override
public void completed(Store store) {
if (remaining.decrementAndGet() == 0) {
tracker.completed();
}
}
@Override
public void notExecuted(Store store, String reason) {
tracker.notExecuted(store, reason);
tryCompleted();
}
@Override
public void beforeExecution(Store store) {
tracker.beforeExecution(store);
}
@Override
public void afterExecution(Store store) {
tracker.afterExecution(store);
tryCompleted();
}
}
private CompactionLifeCycleTracker wrap(CompactionLifeCycleTracker tracker,
private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
IntSupplier numberOfStores) {
if (tracker == CompactionLifeCycleTracker.DUMMY) {
// a simple optimization to avoid creating unnecessary objects as usually we do not care about
// the life cycle of a compaction.
return tracker;
return DUMMY_COMPLETE_TRACKER;
} else {
return new AggregatingCompactionLifeCycleTracker(tracker, numberOfStores.getAsInt());
return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
}
}
@Override
public synchronized void requestCompaction(HRegion region, String why, int priority,
CompactionLifeCycleTracker tracker, User user) throws IOException {
requestCompactionInternal(region, why, priority, true,
wrap(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
requestCompactionInternal(region, why, priority, true, tracker,
getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
}
@Override
public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
CompactionLifeCycleTracker tracker, User user) throws IOException {
requestCompactionInternal(region, store, why, priority, true, wrap(tracker, () -> 1), user);
requestCompactionInternal(region, store, why, priority, true, tracker,
getCompleteTracker(tracker, () -> 1), user);
}
private void requestCompactionInternal(HRegion region, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
// request compaction on all stores
for (HStore store : region.stores.values()) {
requestCompactionInternal(region, store, why, priority, selectNow, tracker, user);
requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
user);
}
}
private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
if (this.server.isStopped() || (region.getTableDescriptor() != null &&
!region.getTableDescriptor().isCompactionEnabled())) {
return;
@ -322,33 +315,36 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
String reason = "Ignoring compaction request for " + region +
" as an active space quota violation " + " policy disallows compactions.";
tracker.notExecuted(store, reason);
completeTracker.completed(store);
LOG.debug(reason);
return;
}
Optional<CompactionContext> compaction;
CompactionContext compaction;
if (selectNow) {
compaction = selectCompaction(region, store, priority, tracker, user);
if (!compaction.isPresent()) {
Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user);
if (!c.isPresent()) {
// message logged inside
return;
}
compaction = c.get();
} else {
compaction = Optional.empty();
compaction = null;
}
ThreadPoolExecutor pool;
if (selectNow) {
// compaction.get is safe as we will just return if selectNow is true but no compaction is
// selected
pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions
pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
: shortCompactions;
} else {
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
pool = shortCompactions;
}
pool.execute(new CompactionRunner(store, region, compaction, pool, user));
pool.execute(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
@ -358,23 +354,25 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
requestCompactionInternal(region, why, NO_PRIORITY, false,
CompactionLifeCycleTracker.DUMMY, null);
requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
DUMMY_COMPLETE_TRACKER, null);
}
public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
throws IOException {
requestCompactionInternal(region, store, why, NO_PRIORITY, false,
CompactionLifeCycleTracker.DUMMY, null);
CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
}
private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
CompactionLifeCycleTracker tracker, User user) throws IOException {
CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
throws IOException {
Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
if (!compaction.isPresent() && region.getRegionInfo() != null) {
String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
" because compaction request was cancelled";
tracker.notExecuted(store, reason);
completeTracker.completed(store);
LOG.debug(reason);
}
return compaction;
@ -491,12 +489,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
if (cmp != 0) {
return cmp;
}
Optional<CompactionContext> c1 = o1.compaction;
Optional<CompactionContext> c2 = o2.compaction;
if (c1.isPresent()) {
return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1;
CompactionContext c1 = o1.compaction;
CompactionContext c2 = o2.compaction;
if (c1 != null) {
return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
} else {
return c2.isPresent() ? 1 : 0;
return c2 != null ? 1 : 0;
}
}
};
@ -504,19 +502,24 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
private final class CompactionRunner implements Runnable {
private final HStore store;
private final HRegion region;
private final Optional<CompactionContext> compaction;
private final CompactionContext compaction;
private final CompactionLifeCycleTracker tracker;
private final CompactionCompleteTracker completeTracker;
private int queuedPriority;
private ThreadPoolExecutor parent;
private User user;
private long time;
public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction,
public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
ThreadPoolExecutor parent, User user) {
this.store = store;
this.region = region;
this.compaction = compaction;
this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority()
: store.getCompactPriority();
this.tracker = tracker;
this.completeTracker = completeTracker;
this.queuedPriority =
compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
this.parent = parent;
this.user = user;
this.time = EnvironmentEdgeManager.currentTime();
@ -524,15 +527,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
@Override
public String toString() {
return compaction.map(c -> "Request = " + c.getRequest())
.orElse("regionName = " + region.toString() + ", storeName = " + store.toString() +
", priority = " + queuedPriority + ", time = " + time);
if (compaction != null) {
return "Request = " + compaction.getRequest();
} else {
return "regionName = " + region.toString() + ", storeName = " + store.toString() +
", priority = " + queuedPriority + ", time = " + time;
}
}
private void doCompaction(User user) {
CompactionContext c;
// Common case - system compaction without a file selection. Select now.
if (!compaction.isPresent()) {
if (compaction == null) {
int oldPriority = this.queuedPriority;
this.queuedPriority = this.store.getCompactPriority();
if (this.queuedPriority > oldPriority) {
@ -543,8 +549,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
Optional<CompactionContext> selected;
try {
selected = selectCompaction(this.region, this.store, queuedPriority,
CompactionLifeCycleTracker.DUMMY, user);
selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
completeTracker, user);
} catch (IOException ex) {
LOG.error("Compaction selection failed " + this, ex);
server.checkFileSystem();
@ -572,12 +578,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
return;
}
} else {
c = compaction.get();
c = compaction;
}
// Finally we can compact something.
assert c != null;
c.getRequest().getTracker().beforeExecution(store);
tracker.beforeExecution(store);
try {
// Note: please don't put single-compaction logic here;
// put it into region/store/etc. This is CST logic.
@ -610,7 +616,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
region.reportCompactionRequestFailure();
server.checkFileSystem();
} finally {
c.getRequest().getTracker().afterExecution(store);
tracker.afterExecution(store);
completeTracker.completed(store);
region.decrementCompactionsQueuedCount();
LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
}
@ -645,7 +652,9 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
if (runnable instanceof CompactionRunner) {
CompactionRunner runner = (CompactionRunner) runnable;
LOG.debug("Compaction Rejected: " + runner);
runner.compaction.ifPresent(c -> runner.store.cancelRequestedCompaction(c));
if (runner.compaction != null) {
runner.store.cancelRequestedCompaction(runner.compaction);
}
}
}
}

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
@ -76,7 +78,12 @@ public class TestCompactionLifeCycleTracker {
private static CompactionLifeCycleTracker TRACKER = null;
// make sure that we pass the correct CompactionLifeCycleTracker to CP hooks.
public static final class CompactionObserver implements RegionObserver {
public static final class CompactionObserver implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,