HBASE-19069 Do not wrap the original CompactionLifeCycleTracker when calling CP hooks
This commit is contained in:
parent
3e0b90b949
commit
a6f89f029a
|
@ -237,80 +237,73 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
}
|
||||
}
|
||||
|
||||
// A compaction life cycle tracker to trace the execution of all the compactions triggered by one
|
||||
// request and delegate to the source CompactionLifeCycleTracker. It will call completed method if
|
||||
// all the compactions are finished.
|
||||
private static final class AggregatingCompactionLifeCycleTracker
|
||||
implements CompactionLifeCycleTracker {
|
||||
private interface CompactionCompleteTracker {
|
||||
|
||||
default void completed(Store store) {
|
||||
}
|
||||
}
|
||||
|
||||
private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
|
||||
new CompactionCompleteTracker() {
|
||||
};
|
||||
|
||||
private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
|
||||
|
||||
private final CompactionLifeCycleTracker tracker;
|
||||
|
||||
private final AtomicInteger remaining;
|
||||
|
||||
public AggregatingCompactionLifeCycleTracker(CompactionLifeCycleTracker tracker,
|
||||
int numberOfStores) {
|
||||
public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
|
||||
this.tracker = tracker;
|
||||
this.remaining = new AtomicInteger(numberOfStores);
|
||||
}
|
||||
|
||||
private void tryCompleted() {
|
||||
@Override
|
||||
public void completed(Store store) {
|
||||
if (remaining.decrementAndGet() == 0) {
|
||||
tracker.completed();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notExecuted(Store store, String reason) {
|
||||
tracker.notExecuted(store, reason);
|
||||
tryCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeExecution(Store store) {
|
||||
tracker.beforeExecution(store);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterExecution(Store store) {
|
||||
tracker.afterExecution(store);
|
||||
tryCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
private CompactionLifeCycleTracker wrap(CompactionLifeCycleTracker tracker,
|
||||
private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
|
||||
IntSupplier numberOfStores) {
|
||||
if (tracker == CompactionLifeCycleTracker.DUMMY) {
|
||||
// a simple optimization to avoid creating unnecessary objects as usually we do not care about
|
||||
// the life cycle of a compaction.
|
||||
return tracker;
|
||||
return DUMMY_COMPLETE_TRACKER;
|
||||
} else {
|
||||
return new AggregatingCompactionLifeCycleTracker(tracker, numberOfStores.getAsInt());
|
||||
return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void requestCompaction(HRegion region, String why, int priority,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
requestCompactionInternal(region, why, priority, true,
|
||||
wrap(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
|
||||
requestCompactionInternal(region, why, priority, true, tracker,
|
||||
getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
requestCompactionInternal(region, store, why, priority, true, wrap(tracker, () -> 1), user);
|
||||
requestCompactionInternal(region, store, why, priority, true, tracker,
|
||||
getCompleteTracker(tracker, () -> 1), user);
|
||||
}
|
||||
|
||||
private void requestCompactionInternal(HRegion region, String why, int priority,
|
||||
boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
boolean selectNow, CompactionLifeCycleTracker tracker,
|
||||
CompactionCompleteTracker completeTracker, User user) throws IOException {
|
||||
// request compaction on all stores
|
||||
for (HStore store : region.stores.values()) {
|
||||
requestCompactionInternal(region, store, why, priority, selectNow, tracker, user);
|
||||
requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
|
||||
user);
|
||||
}
|
||||
}
|
||||
|
||||
private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
|
||||
boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
boolean selectNow, CompactionLifeCycleTracker tracker,
|
||||
CompactionCompleteTracker completeTracker, User user) throws IOException {
|
||||
if (this.server.isStopped() || (region.getTableDescriptor() != null &&
|
||||
!region.getTableDescriptor().isCompactionEnabled())) {
|
||||
return;
|
||||
|
@ -322,33 +315,36 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
String reason = "Ignoring compaction request for " + region +
|
||||
" as an active space quota violation " + " policy disallows compactions.";
|
||||
tracker.notExecuted(store, reason);
|
||||
completeTracker.completed(store);
|
||||
LOG.debug(reason);
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<CompactionContext> compaction;
|
||||
CompactionContext compaction;
|
||||
if (selectNow) {
|
||||
compaction = selectCompaction(region, store, priority, tracker, user);
|
||||
if (!compaction.isPresent()) {
|
||||
Optional<CompactionContext> c = selectCompaction(region, store, priority, tracker, completeTracker, user);
|
||||
if (!c.isPresent()) {
|
||||
// message logged inside
|
||||
return;
|
||||
}
|
||||
compaction = c.get();
|
||||
} else {
|
||||
compaction = Optional.empty();
|
||||
compaction = null;
|
||||
}
|
||||
|
||||
ThreadPoolExecutor pool;
|
||||
if (selectNow) {
|
||||
// compaction.get is safe as we will just return if selectNow is true but no compaction is
|
||||
// selected
|
||||
pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions
|
||||
pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
|
||||
: shortCompactions;
|
||||
} else {
|
||||
// We assume that most compactions are small. So, put system compactions into small
|
||||
// pool; we will do selection there, and move to large pool if necessary.
|
||||
pool = shortCompactions;
|
||||
}
|
||||
pool.execute(new CompactionRunner(store, region, compaction, pool, user));
|
||||
pool.execute(
|
||||
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
|
||||
region.incrementCompactionsQueuedCount();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String type = (pool == shortCompactions) ? "Small " : "Large ";
|
||||
|
@ -358,23 +354,25 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
}
|
||||
|
||||
public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
|
||||
requestCompactionInternal(region, why, NO_PRIORITY, false,
|
||||
CompactionLifeCycleTracker.DUMMY, null);
|
||||
requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
|
||||
DUMMY_COMPLETE_TRACKER, null);
|
||||
}
|
||||
|
||||
public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
|
||||
throws IOException {
|
||||
requestCompactionInternal(region, store, why, NO_PRIORITY, false,
|
||||
CompactionLifeCycleTracker.DUMMY, null);
|
||||
CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
|
||||
}
|
||||
|
||||
private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
|
||||
throws IOException {
|
||||
Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
|
||||
if (!compaction.isPresent() && region.getRegionInfo() != null) {
|
||||
String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
|
||||
" because compaction request was cancelled";
|
||||
tracker.notExecuted(store, reason);
|
||||
completeTracker.completed(store);
|
||||
LOG.debug(reason);
|
||||
}
|
||||
return compaction;
|
||||
|
@ -491,12 +489,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
if (cmp != 0) {
|
||||
return cmp;
|
||||
}
|
||||
Optional<CompactionContext> c1 = o1.compaction;
|
||||
Optional<CompactionContext> c2 = o2.compaction;
|
||||
if (c1.isPresent()) {
|
||||
return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1;
|
||||
CompactionContext c1 = o1.compaction;
|
||||
CompactionContext c2 = o2.compaction;
|
||||
if (c1 != null) {
|
||||
return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
|
||||
} else {
|
||||
return c2.isPresent() ? 1 : 0;
|
||||
return c2 != null ? 1 : 0;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -504,19 +502,24 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
private final class CompactionRunner implements Runnable {
|
||||
private final HStore store;
|
||||
private final HRegion region;
|
||||
private final Optional<CompactionContext> compaction;
|
||||
private final CompactionContext compaction;
|
||||
private final CompactionLifeCycleTracker tracker;
|
||||
private final CompactionCompleteTracker completeTracker;
|
||||
private int queuedPriority;
|
||||
private ThreadPoolExecutor parent;
|
||||
private User user;
|
||||
private long time;
|
||||
|
||||
public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction,
|
||||
public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
|
||||
CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
|
||||
ThreadPoolExecutor parent, User user) {
|
||||
this.store = store;
|
||||
this.region = region;
|
||||
this.compaction = compaction;
|
||||
this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority()
|
||||
: store.getCompactPriority();
|
||||
this.tracker = tracker;
|
||||
this.completeTracker = completeTracker;
|
||||
this.queuedPriority =
|
||||
compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
|
||||
this.parent = parent;
|
||||
this.user = user;
|
||||
this.time = EnvironmentEdgeManager.currentTime();
|
||||
|
@ -524,15 +527,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return compaction.map(c -> "Request = " + c.getRequest())
|
||||
.orElse("regionName = " + region.toString() + ", storeName = " + store.toString() +
|
||||
", priority = " + queuedPriority + ", time = " + time);
|
||||
if (compaction != null) {
|
||||
return "Request = " + compaction.getRequest();
|
||||
} else {
|
||||
return "regionName = " + region.toString() + ", storeName = " + store.toString() +
|
||||
", priority = " + queuedPriority + ", time = " + time;
|
||||
}
|
||||
}
|
||||
|
||||
private void doCompaction(User user) {
|
||||
CompactionContext c;
|
||||
// Common case - system compaction without a file selection. Select now.
|
||||
if (!compaction.isPresent()) {
|
||||
if (compaction == null) {
|
||||
int oldPriority = this.queuedPriority;
|
||||
this.queuedPriority = this.store.getCompactPriority();
|
||||
if (this.queuedPriority > oldPriority) {
|
||||
|
@ -543,8 +549,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
}
|
||||
Optional<CompactionContext> selected;
|
||||
try {
|
||||
selected = selectCompaction(this.region, this.store, queuedPriority,
|
||||
CompactionLifeCycleTracker.DUMMY, user);
|
||||
selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
|
||||
completeTracker, user);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Compaction selection failed " + this, ex);
|
||||
server.checkFileSystem();
|
||||
|
@ -572,12 +578,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
return;
|
||||
}
|
||||
} else {
|
||||
c = compaction.get();
|
||||
c = compaction;
|
||||
}
|
||||
// Finally we can compact something.
|
||||
assert c != null;
|
||||
|
||||
c.getRequest().getTracker().beforeExecution(store);
|
||||
tracker.beforeExecution(store);
|
||||
try {
|
||||
// Note: please don't put single-compaction logic here;
|
||||
// put it into region/store/etc. This is CST logic.
|
||||
|
@ -610,7 +616,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
region.reportCompactionRequestFailure();
|
||||
server.checkFileSystem();
|
||||
} finally {
|
||||
c.getRequest().getTracker().afterExecution(store);
|
||||
tracker.afterExecution(store);
|
||||
completeTracker.completed(store);
|
||||
region.decrementCompactionsQueuedCount();
|
||||
LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
|
||||
}
|
||||
|
@ -645,7 +652,9 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
|
|||
if (runnable instanceof CompactionRunner) {
|
||||
CompactionRunner runner = (CompactionRunner) runnable;
|
||||
LOG.debug("Compaction Rejected: " + runner);
|
||||
runner.compaction.ifPresent(c -> runner.store.cancelRequestedCompaction(c));
|
||||
if (runner.compaction != null) {
|
||||
runner.store.cancelRequestedCompaction(runner.compaction);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||
|
@ -76,7 +78,12 @@ public class TestCompactionLifeCycleTracker {
|
|||
private static CompactionLifeCycleTracker TRACKER = null;
|
||||
|
||||
// make sure that we pass the correct CompactionLifeCycleTracker to CP hooks.
|
||||
public static final class CompactionObserver implements RegionObserver {
|
||||
public static final class CompactionObserver implements RegionObserver, RegionCoprocessor {
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
|
|
Loading…
Reference in New Issue