HBASE-14655 Narrow the scope of doAs() calls to region observer notifications for compaction
This commit is contained in:
parent
cdf2c01a76
commit
bc990a3328
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -352,7 +351,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
|
||||
CompactionContext compaction = null;
|
||||
if (selectNow) {
|
||||
compaction = selectCompaction(r, s, priority, request);
|
||||
compaction = selectCompaction(r, s, priority, request, user);
|
||||
if (compaction == null) return null; // message logged inside
|
||||
}
|
||||
|
||||
|
@ -370,10 +369,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
}
|
||||
|
||||
private CompactionContext selectCompaction(final Region r, final Store s,
|
||||
int priority, CompactionRequest request) throws IOException {
|
||||
CompactionContext compaction = s.requestCompaction(priority, request);
|
||||
int priority, CompactionRequest request, User user) throws IOException {
|
||||
CompactionContext compaction = s.requestCompaction(priority, request, user);
|
||||
if (compaction == null) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
|
||||
LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
|
||||
" because compaction request was cancelled");
|
||||
}
|
||||
|
@ -484,7 +483,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
: ("Store = " + store.toString() + ", pri = " + queuedPriority);
|
||||
}
|
||||
|
||||
private void doCompaction() {
|
||||
private void doCompaction(User user) {
|
||||
// Common case - system compaction without a file selection. Select now.
|
||||
if (this.compaction == null) {
|
||||
int oldPriority = this.queuedPriority;
|
||||
|
@ -496,7 +495,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
return;
|
||||
}
|
||||
try {
|
||||
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
|
||||
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Compaction selection failed " + this, ex);
|
||||
server.checkFileSystem();
|
||||
|
@ -528,7 +527,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
// put it into region/store/etc. This is CST logic.
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
boolean completed =
|
||||
region.compact(compaction, store, compactionThroughputController);
|
||||
region.compact(compaction, store, compactionThroughputController, user);
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
|
||||
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
|
||||
|
@ -564,22 +563,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
|| (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
|
||||
return;
|
||||
}
|
||||
if (this.user == null) doCompaction();
|
||||
else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
doCompaction();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Encountered exception while compacting", ioe);
|
||||
}
|
||||
}
|
||||
doCompaction(user);
|
||||
}
|
||||
|
||||
private String formatStackTrace(Exception ex) {
|
||||
|
|
|
@ -73,15 +73,14 @@ public interface CompactionRequestor {
|
|||
* @param requests custom compaction requests. Each compaction must specify the store on which it
|
||||
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
|
||||
* stores for the region.
|
||||
* @user the effective user
|
||||
* @param user the effective user
|
||||
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
|
||||
* compactions were started.
|
||||
* @throws IOException
|
||||
*/
|
||||
List<CompactionRequest> requestCompaction(
|
||||
final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests,
|
||||
User user
|
||||
) throws IOException;
|
||||
User user) throws IOException;
|
||||
|
||||
/**
|
||||
* @param r Region to compact
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolic
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
|
@ -109,7 +110,13 @@ public class DefaultStoreEngine extends StoreEngine<
|
|||
@Override
|
||||
public List<Path> compact(CompactionThroughputController throughputController)
|
||||
throws IOException {
|
||||
return compactor.compact(request, throughputController);
|
||||
return compact(throughputController, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> compact(CompactionThroughputController throughputController, User user)
|
||||
throws IOException {
|
||||
return compactor.compact(request, throughputController, user);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -154,6 +154,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
|
@ -1717,7 +1718,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
if (controller == null) {
|
||||
controller = NoLimitCompactionThroughputController.INSTANCE;
|
||||
}
|
||||
compact(compaction, s, controller);
|
||||
compact(compaction, s, controller, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1732,7 +1733,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
for (Store s : getStores()) {
|
||||
CompactionContext compaction = s.requestCompaction();
|
||||
if (compaction != null) {
|
||||
compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE);
|
||||
compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1749,7 +1750,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
Store s = getStore(family);
|
||||
CompactionContext compaction = s.requestCompaction();
|
||||
if (compaction != null) {
|
||||
compact(compaction, s, throughputController);
|
||||
compact(compaction, s, throughputController, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1765,10 +1766,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* server does them sequentially and not in parallel.
|
||||
*
|
||||
* @param compaction Compaction details, obtained by requestCompaction()
|
||||
* @param throughputController
|
||||
* @return whether the compaction completed
|
||||
*/
|
||||
public boolean compact(CompactionContext compaction, Store store,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
return compact(compaction, store, throughputController, null);
|
||||
}
|
||||
|
||||
public boolean compact(CompactionContext compaction, Store store,
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
assert compaction != null && compaction.hasSelection();
|
||||
assert !compaction.getRequest().getFiles().isEmpty();
|
||||
if (this.closing.get() || this.closed.get()) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.security.Key;
|
||||
import java.security.KeyException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -1188,6 +1189,12 @@ public class HStore implements Store {
|
|||
@Override
|
||||
public List<StoreFile> compact(CompactionContext compaction,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
return compact(compaction, throughputController, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StoreFile> compact(CompactionContext compaction,
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
assert compaction != null;
|
||||
List<StoreFile> sfs = null;
|
||||
CompactionRequest cr = compaction.getRequest();
|
||||
|
@ -1212,7 +1219,7 @@ public class HStore implements Store {
|
|||
+ TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
|
||||
|
||||
// Commence the compaction.
|
||||
List<Path> newFiles = compaction.compact(throughputController);
|
||||
List<Path> newFiles = compaction.compact(throughputController, user);
|
||||
|
||||
// TODO: get rid of this!
|
||||
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
|
||||
|
@ -1227,7 +1234,7 @@ public class HStore implements Store {
|
|||
return sfs;
|
||||
}
|
||||
// Do the steps necessary to complete the compaction.
|
||||
sfs = moveCompatedFilesIntoPlace(cr, newFiles);
|
||||
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
|
||||
writeCompactionWalRecord(filesToCompact, sfs);
|
||||
replaceStoreFiles(filesToCompact, sfs);
|
||||
if (cr.isMajor()) {
|
||||
|
@ -1248,13 +1255,30 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
private List<StoreFile> moveCompatedFilesIntoPlace(
|
||||
CompactionRequest cr, List<Path> newFiles) throws IOException {
|
||||
final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
|
||||
List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
|
||||
for (Path newFile : newFiles) {
|
||||
assert newFile != null;
|
||||
StoreFile sf = moveFileIntoPlace(newFile);
|
||||
final StoreFile sf = moveFileIntoPlace(newFile);
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
this.getCoprocessorHost().postCompact(this, sf, cr);
|
||||
final Store thisStore = this;
|
||||
if (user == null) {
|
||||
getCoprocessorHost().postCompact(thisStore, sf, cr);
|
||||
} else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
getCoprocessorHost().postCompact(thisStore, sf, cr);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert sf != null;
|
||||
sfs.add(sf);
|
||||
|
@ -1501,6 +1525,11 @@ public class HStore implements Store {
|
|||
@Override
|
||||
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
|
||||
throws IOException {
|
||||
return requestCompaction(priority, baseRequest, null);
|
||||
}
|
||||
@Override
|
||||
public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
|
||||
User user) throws IOException {
|
||||
// don't even select for compaction if writes are disabled
|
||||
if (!this.areWritesEnabled()) {
|
||||
return null;
|
||||
|
@ -1509,16 +1538,34 @@ public class HStore implements Store {
|
|||
// Before we do compaction, try to get rid of unneeded files to simplify things.
|
||||
removeUnneededFiles();
|
||||
|
||||
CompactionContext compaction = storeEngine.createCompaction();
|
||||
final CompactionContext compaction = storeEngine.createCompaction();
|
||||
CompactionRequest request = null;
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
synchronized (filesCompacting) {
|
||||
final Store thisStore = this;
|
||||
// First, see if coprocessor would want to override selection.
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
|
||||
boolean override = this.getCoprocessorHost().preCompactSelection(
|
||||
this, candidatesForCoproc, baseRequest);
|
||||
final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
|
||||
boolean override = false;
|
||||
if (user == null) {
|
||||
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
|
||||
baseRequest);
|
||||
} else {
|
||||
try {
|
||||
override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
|
||||
@Override
|
||||
public Boolean run() throws Exception {
|
||||
return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
|
||||
baseRequest);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
if (override) {
|
||||
// Coprocessor is overriding normal file selection.
|
||||
compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
|
||||
|
@ -1546,8 +1593,25 @@ public class HStore implements Store {
|
|||
}
|
||||
}
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
this.getCoprocessorHost().postCompactSelection(
|
||||
if (user == null) {
|
||||
this.getCoprocessorHost().postCompactSelection(
|
||||
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
|
||||
} else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
getCoprocessorHost().postCompactSelection(
|
||||
thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Selected files; see if we have a compaction with some custom base request.
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
|
@ -207,14 +208,28 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
|||
|
||||
CompactionContext requestCompaction() throws IOException;
|
||||
|
||||
/**
|
||||
* @deprecated see requestCompaction(int, CompactionRequest, User)
|
||||
*/
|
||||
@Deprecated
|
||||
CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
|
||||
throws IOException;
|
||||
|
||||
CompactionContext requestCompaction(int priority, CompactionRequest baseRequest, User user)
|
||||
throws IOException;
|
||||
|
||||
void cancelRequestedCompaction(CompactionContext compaction);
|
||||
|
||||
/**
|
||||
* @deprecated see compact(CompactionContext, CompactionThroughputController, User)
|
||||
*/
|
||||
@Deprecated
|
||||
List<StoreFile> compact(CompactionContext compaction,
|
||||
CompactionThroughputController throughputController) throws IOException;
|
||||
|
||||
List<StoreFile> compact(CompactionContext compaction,
|
||||
CompactionThroughputController throughputController, User user) throws IOException;
|
||||
|
||||
/**
|
||||
* @return true if we should run a major compaction.
|
||||
*/
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
@ -102,7 +103,14 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
|
|||
public List<Path> compact(CompactionThroughputController throughputController)
|
||||
throws IOException {
|
||||
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
|
||||
return this.stripeRequest.execute(compactor, throughputController);
|
||||
return this.stripeRequest.execute(compactor, throughputController, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> compact(CompactionThroughputController throughputController, User user)
|
||||
throws IOException {
|
||||
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
|
||||
return this.stripeRequest.execute(compactor, throughputController, user);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -71,6 +72,9 @@ public abstract class CompactionContext {
|
|||
public abstract List<Path> compact(CompactionThroughputController throughputController)
|
||||
throws IOException;
|
||||
|
||||
public abstract List<Path> compact(CompactionThroughputController throughputController, User user)
|
||||
throws IOException;
|
||||
|
||||
public CompactionRequest getRequest() {
|
||||
assert hasSelection();
|
||||
return this.request;
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
|
|||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
||||
|
@ -210,9 +212,31 @@ public abstract class Compactor {
|
|||
*/
|
||||
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
|
||||
ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
|
||||
return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
|
||||
}
|
||||
|
||||
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
|
||||
final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
|
||||
User user) throws IOException {
|
||||
if (store.getCoprocessorHost() == null) return null;
|
||||
return store.getCoprocessorHost()
|
||||
.preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
|
||||
if (user == null) {
|
||||
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
|
||||
earliestPutTs, request);
|
||||
} else {
|
||||
try {
|
||||
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
|
||||
@Override
|
||||
public InternalScanner run() throws Exception {
|
||||
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
|
||||
scanType, earliestPutTs, request);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -223,9 +247,24 @@ public abstract class Compactor {
|
|||
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
|
||||
*/
|
||||
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
|
||||
ScanType scanType, InternalScanner scanner) throws IOException {
|
||||
if (store.getCoprocessorHost() == null) return scanner;
|
||||
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
|
||||
final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
|
||||
if (store.getCoprocessorHost() == null) return scanner;
|
||||
if (user == null) {
|
||||
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
|
||||
} else {
|
||||
try {
|
||||
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
|
||||
@Override
|
||||
public InternalScanner run() throws Exception {
|
||||
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -34,10 +34,11 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
|
|||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
/**
|
||||
* Compact passed set of files. Create an instance and then call
|
||||
* {@link #compact(CompactionRequest, CompactionThroughputController)}
|
||||
* {@link #compact(CompactionRequest, CompactionThroughputController, User)}
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DefaultCompactor extends Compactor {
|
||||
|
@ -51,7 +52,7 @@ public class DefaultCompactor extends Compactor {
|
|||
* Do a minor/major compaction on an explicit set of storefiles from a Store.
|
||||
*/
|
||||
public List<Path> compact(final CompactionRequest request,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
|
||||
this.progress = new CompactionProgress(fd.maxKeyCount);
|
||||
|
||||
|
@ -86,11 +87,11 @@ public class DefaultCompactor extends Compactor {
|
|||
|
||||
ScanType scanType =
|
||||
request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
|
||||
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
|
||||
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
|
||||
if (scanner == null) {
|
||||
scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
|
||||
}
|
||||
scanner = postCreateCoprocScanner(request, scanType, scanner);
|
||||
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
|
||||
if (scanner == null) {
|
||||
// NULL scanner returned from coprocessor hooks means skip normal processing.
|
||||
return newFiles;
|
||||
|
@ -155,7 +156,7 @@ public class DefaultCompactor extends Compactor {
|
|||
|
||||
/**
|
||||
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
|
||||
* {@link #compact(CompactionRequest, CompactionThroughputController)};
|
||||
* {@link #compact(CompactionRequest, CompactionThroughputController, User)};
|
||||
* @param filesToCompact the files to compact. These are used as the compactionSelection for
|
||||
* the generated {@link CompactionRequest}.
|
||||
* @param isMajor true to major compact (prune all deletes, max versions, etc)
|
||||
|
@ -167,6 +168,6 @@ public class DefaultCompactor extends Compactor {
|
|||
throws IOException {
|
||||
CompactionRequest cr = new CompactionRequest(filesToCompact);
|
||||
cr.setIsMajor(isMajor, isMajor);
|
||||
return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE);
|
||||
return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
|
|||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
|
||||
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ConcatenatedLists;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -390,6 +391,10 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
protected CompactionRequest request;
|
||||
protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
|
||||
|
||||
public List<Path> execute(StripeCompactor compactor,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
return execute(compactor, throughputController, null);
|
||||
}
|
||||
/**
|
||||
* Executes the request against compactor (essentially, just calls correct overload of
|
||||
* compact method), to simulate more dynamic dispatch.
|
||||
|
@ -397,7 +402,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
* @return result of compact(...)
|
||||
*/
|
||||
public abstract List<Path> execute(StripeCompactor compactor,
|
||||
CompactionThroughputController throughputController) throws IOException;
|
||||
CompactionThroughputController throughputController, User user) throws IOException;
|
||||
|
||||
public StripeCompactionRequest(CompactionRequest request) {
|
||||
this.request = request;
|
||||
|
@ -449,9 +454,9 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
|
||||
@Override
|
||||
public List<Path> execute(StripeCompactor compactor,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
|
||||
this.majorRangeToRow, throughputController);
|
||||
this.majorRangeToRow, throughputController, user);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -500,9 +505,9 @@ public class StripeCompactionPolicy extends CompactionPolicy {
|
|||
|
||||
@Override
|
||||
public List<Path> execute(StripeCompactor compactor,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
|
||||
this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController);
|
||||
this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
|
||||
}
|
||||
|
||||
/** Set major range of the compaction to the entire compaction range.
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
|||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
|
@ -50,9 +51,16 @@ public class StripeCompactor extends Compactor {
|
|||
super(conf, store);
|
||||
}
|
||||
|
||||
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
|
||||
byte[] majorRangeFromRow, byte[] majorRangeToRow,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
|
||||
throughputController, null);
|
||||
}
|
||||
|
||||
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
|
||||
byte[] majorRangeFromRow, byte[] majorRangeToRow,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
|
||||
|
@ -64,12 +72,19 @@ public class StripeCompactor extends Compactor {
|
|||
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
|
||||
targetBoundaries, majorRangeFromRow, majorRangeToRow);
|
||||
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
|
||||
throughputController);
|
||||
throughputController, user);
|
||||
}
|
||||
|
||||
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
|
||||
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
|
||||
majorRangeToRow, throughputController, null);
|
||||
}
|
||||
|
||||
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
|
||||
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Executing compaction with " + targetSize
|
||||
+ " target file size, no more than " + targetCount + " files, in ["
|
||||
|
@ -78,12 +93,12 @@ public class StripeCompactor extends Compactor {
|
|||
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
|
||||
targetCount, targetSize, left, right);
|
||||
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
|
||||
throughputController);
|
||||
throughputController, user);
|
||||
}
|
||||
|
||||
private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
|
||||
byte[] majorRangeFromRow, byte[] majorRangeToRow,
|
||||
CompactionThroughputController throughputController) throws IOException {
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
final Collection<StoreFile> filesToCompact = request.getFiles();
|
||||
final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
|
||||
this.progress = new CompactionProgress(fd.maxKeyCount);
|
||||
|
@ -98,7 +113,7 @@ public class StripeCompactor extends Compactor {
|
|||
try {
|
||||
// Get scanner to use.
|
||||
ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
|
||||
scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners);
|
||||
scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user);
|
||||
if (scanner == null) {
|
||||
scanner = (majorRangeFromRow == null)
|
||||
? createScanner(store, scanners,
|
||||
|
@ -106,7 +121,7 @@ public class StripeCompactor extends Compactor {
|
|||
: createScanner(store, scanners,
|
||||
smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
|
||||
}
|
||||
scanner = postCreateCoprocScanner(request, coprocScanType, scanner);
|
||||
scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user);
|
||||
if (scanner == null) {
|
||||
// NULL scanner returned from coprocessor hooks means skip normal processing.
|
||||
return new ArrayList<Path>();
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
|
@ -128,6 +129,16 @@ public class TestIOFencing {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean compact(CompactionContext compaction, Store store,
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
try {
|
||||
return super.compact(compaction, store, throughputController, user);
|
||||
} finally {
|
||||
compactCount++;
|
||||
}
|
||||
}
|
||||
|
||||
public int countStoreFiles() {
|
||||
int count = 0;
|
||||
for (Store store : stores.values()) {
|
||||
|
@ -350,4 +361,4 @@ public class TestIOFencing {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
|
|||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
|
@ -236,6 +237,14 @@ public class TestRegionObserverScannerOpenHook {
|
|||
if (ret) compactionStateChangeLatch.countDown();
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean compact(CompactionContext compaction, Store store,
|
||||
CompactionThroughputController throughputController, User user) throws IOException {
|
||||
boolean ret = super.compact(compaction, store, throughputController, user);
|
||||
if (ret) compactionStateChangeLatch.countDown();
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.mockito.Mockito.*;
|
|||
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
|
@ -56,6 +57,8 @@ public class StatefulStoreMockMaker {
|
|||
Store store = mock(Store.class, name);
|
||||
when(store.requestCompaction(
|
||||
anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer());
|
||||
when(store.requestCompaction(
|
||||
anyInt(), isNull(CompactionRequest.class), any(User.class))).then(new SelectAnswer());
|
||||
when(store.getCompactPriority()).then(new PriorityAnswer());
|
||||
doAnswer(new CancelAnswer()).when(
|
||||
store).cancelRequestedCompaction(any(CompactionContext.class));
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
|
@ -366,6 +367,12 @@ public class TestCompaction {
|
|||
@Override
|
||||
public List<Path> compact(CompactionThroughputController throughputController)
|
||||
throws IOException {
|
||||
return compact(throughputController, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> compact(CompactionThroughputController throughputController, User user)
|
||||
throws IOException {
|
||||
finishCompaction(this.selectedFiles);
|
||||
return new ArrayList<Path>();
|
||||
}
|
||||
|
@ -418,6 +425,12 @@ public class TestCompaction {
|
|||
@Override
|
||||
public List<Path> compact(CompactionThroughputController throughputController)
|
||||
throws IOException {
|
||||
return compact(throughputController, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> compact(CompactionThroughputController throughputController, User user)
|
||||
throws IOException {
|
||||
try {
|
||||
isInCompact = true;
|
||||
synchronized (this) {
|
||||
|
@ -499,7 +512,7 @@ public class TestCompaction {
|
|||
HRegion r = mock(HRegion.class);
|
||||
when(
|
||||
r.compact(any(CompactionContext.class), any(Store.class),
|
||||
any(CompactionThroughputController.class))).then(new Answer<Boolean>() {
|
||||
any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
|
||||
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||
invocation.getArgumentAt(0, CompactionContext.class).compact(
|
||||
invocation.getArgumentAt(2, CompactionThroughputController.class));
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -75,7 +76,8 @@ public class TestStripeStoreEngine {
|
|||
when(
|
||||
mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
|
||||
any(byte[].class), any(byte[].class), any(byte[].class),
|
||||
any(CompactionThroughputController.class))).thenReturn(new ArrayList<Path>());
|
||||
any(CompactionThroughputController.class), any(User.class)))
|
||||
.thenReturn(new ArrayList<Path>());
|
||||
|
||||
// Produce 3 L0 files.
|
||||
StoreFile sf = createFile();
|
||||
|
@ -96,7 +98,7 @@ public class TestStripeStoreEngine {
|
|||
compaction.compact(NoLimitCompactionThroughputController.INSTANCE);
|
||||
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
|
||||
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
|
||||
NoLimitCompactionThroughputController.INSTANCE);
|
||||
NoLimitCompactionThroughputController.INSTANCE, null);
|
||||
}
|
||||
|
||||
private static StoreFile createFile() throws Exception {
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
|
|||
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
|
||||
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ConcatenatedLists;
|
||||
|
@ -211,10 +212,10 @@ public class TestStripeCompactionPolicy {
|
|||
assertTrue(policy.needsCompactions(si, al()));
|
||||
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
|
||||
assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
|
||||
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
|
||||
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
|
||||
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
|
||||
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
|
||||
any(NoLimitCompactionThroughputController.class));
|
||||
any(NoLimitCompactionThroughputController.class), any(User.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -456,7 +457,7 @@ public class TestStripeCompactionPolicy {
|
|||
// All the Stripes are expired, so the Compactor will not create any Writers. We need to create
|
||||
// an empty file to preserve metadata
|
||||
StripeCompactor sc = createCompactor();
|
||||
List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
|
||||
List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
|
||||
assertEquals(1, paths.size());
|
||||
}
|
||||
|
||||
|
@ -515,7 +516,7 @@ public class TestStripeCompactionPolicy {
|
|||
assertTrue(policy.needsCompactions(si, al()));
|
||||
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
|
||||
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
|
||||
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
|
||||
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
|
||||
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
|
@ -529,7 +530,7 @@ public class TestStripeCompactionPolicy {
|
|||
}
|
||||
}), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
|
||||
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
|
||||
any(NoLimitCompactionThroughputController.class));
|
||||
any(NoLimitCompactionThroughputController.class), any(User.class));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -550,12 +551,12 @@ public class TestStripeCompactionPolicy {
|
|||
assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
|
||||
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
|
||||
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
|
||||
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
|
||||
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
|
||||
verify(sc, times(1)).compact(eq(scr.getRequest()),
|
||||
count == null ? anyInt() : eq(count.intValue()),
|
||||
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
|
||||
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
|
||||
any(NoLimitCompactionThroughputController.class));
|
||||
any(NoLimitCompactionThroughputController.class), any(User.class));
|
||||
}
|
||||
|
||||
/** Verify arbitrary flush. */
|
||||
|
|
Loading…
Reference in New Issue