HBASE-14655 Narrow the scope of doAs() calls to region observer notifications for compaction

This commit is contained in:
tedyu 2015-10-23 14:48:04 -07:00
parent f34011860e
commit 8b7796b0b6
17 changed files with 260 additions and 72 deletions

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -352,7 +351,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
CompactionContext compaction = null;
if (selectNow) {
compaction = selectCompaction(r, s, priority, request);
compaction = selectCompaction(r, s, priority, request, user);
if (compaction == null) return null; // message logged inside
}
@ -370,10 +369,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
}
private CompactionContext selectCompaction(final Region r, final Store s,
int priority, CompactionRequest request) throws IOException {
CompactionContext compaction = s.requestCompaction(priority, request);
int priority, CompactionRequest request, User user) throws IOException {
CompactionContext compaction = s.requestCompaction(priority, request, user);
if (compaction == null) {
if(LOG.isDebugEnabled()) {
if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
" because compaction request was cancelled");
}
@ -484,7 +483,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
: ("Store = " + store.toString() + ", pri = " + queuedPriority);
}
private void doCompaction() {
private void doCompaction(User user) {
// Common case - system compaction without a file selection. Select now.
if (this.compaction == null) {
int oldPriority = this.queuedPriority;
@ -496,7 +495,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
return;
}
try {
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
} catch (IOException ex) {
LOG.error("Compaction selection failed " + this, ex);
server.checkFileSystem();
@ -528,7 +527,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
// put it into region/store/etc. This is CST logic.
long start = EnvironmentEdgeManager.currentTime();
boolean completed =
region.compact(compaction, store, compactionThroughputController);
region.compact(compaction, store, compactionThroughputController, user);
long now = EnvironmentEdgeManager.currentTime();
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
@ -565,22 +564,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|| (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
return;
}
if (this.user == null) doCompaction();
else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
doCompaction();
return null;
}
});
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (IOException ioe) {
LOG.error("Encountered exception while compacting", ioe);
}
}
doCompaction(user);
}
private String formatStackTrace(Exception ex) {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolic
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@ -121,7 +122,13 @@ public class DefaultStoreEngine extends StoreEngine<
@Override
public List<Path> compact(CompactionThroughputController throughputController)
throws IOException {
return compactor.compact(request, throughputController);
return compact(throughputController, null);
}
@Override
public List<Path> compact(CompactionThroughputController throughputController, User user)
throws IOException {
return compactor.compact(request, throughputController, user);
}
@Override

View File

@ -156,6 +156,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.ByteStringer;
@ -1715,7 +1716,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (controller == null) {
controller = NoLimitCompactionThroughputController.INSTANCE;
}
compact(compaction, s, controller);
compact(compaction, s, controller, null);
}
}
}
@ -1730,7 +1731,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Store s : getStores()) {
CompactionContext compaction = s.requestCompaction();
if (compaction != null) {
compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE);
compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
}
}
}
@ -1747,7 +1748,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Store s = getStore(family);
CompactionContext compaction = s.requestCompaction();
if (compaction != null) {
compact(compaction, s, throughputController);
compact(compaction, s, throughputController, null);
}
}
@ -1763,10 +1764,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* server does them sequentially and not in parallel.
*
* @param compaction Compaction details, obtained by requestCompaction()
* @param throughputController
* @return whether the compaction completed
*/
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException {
return compact(compaction, store, throughputController, null);
}
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException {
assert compaction != null && compaction.hasSelection();
assert !compaction.getRequest().getFiles().isEmpty();
if (this.closing.get() || this.closed.get()) {

View File

@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.security.Key;
import java.security.KeyException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -1208,6 +1209,12 @@ public class HStore implements Store {
@Override
public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException {
return compact(compaction, throughputController, null);
}
@Override
public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController, User user) throws IOException {
assert compaction != null;
List<StoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest();
@ -1232,7 +1239,7 @@ public class HStore implements Store {
+ TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
// Commence the compaction.
List<Path> newFiles = compaction.compact(throughputController);
List<Path> newFiles = compaction.compact(throughputController, user);
// TODO: get rid of this!
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
@ -1247,7 +1254,7 @@ public class HStore implements Store {
return sfs;
}
// Do the steps necessary to complete the compaction.
sfs = moveCompatedFilesIntoPlace(cr, newFiles);
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) {
@ -1268,13 +1275,30 @@ public class HStore implements Store {
}
private List<StoreFile> moveCompatedFilesIntoPlace(
CompactionRequest cr, List<Path> newFiles) throws IOException {
final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
for (Path newFile : newFiles) {
assert newFile != null;
StoreFile sf = moveFileIntoPlace(newFile);
final StoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompact(this, sf, cr);
final Store thisStore = this;
if (user == null) {
getCoprocessorHost().postCompact(thisStore, sf, cr);
} else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
getCoprocessorHost().postCompact(thisStore, sf, cr);
return null;
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
assert sf != null;
sfs.add(sf);
@ -1521,6 +1545,11 @@ public class HStore implements Store {
@Override
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
throws IOException {
return requestCompaction(priority, baseRequest, null);
}
@Override
public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
User user) throws IOException {
// don't even select for compaction if writes are disabled
if (!this.areWritesEnabled()) {
return null;
@ -1529,16 +1558,34 @@ public class HStore implements Store {
// Before we do compaction, try to get rid of unneeded files to simplify things.
removeUnneededFiles();
CompactionContext compaction = storeEngine.createCompaction();
final CompactionContext compaction = storeEngine.createCompaction();
CompactionRequest request = null;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
final Store thisStore = this;
// First, see if coprocessor would want to override selection.
if (this.getCoprocessorHost() != null) {
List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = this.getCoprocessorHost().preCompactSelection(
this, candidatesForCoproc, baseRequest);
final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = false;
if (user == null) {
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
baseRequest);
} else {
try {
override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
baseRequest);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
if (override) {
// Coprocessor is overriding normal file selection.
compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
@ -1566,8 +1613,25 @@ public class HStore implements Store {
}
}
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompactSelection(
if (user == null) {
this.getCoprocessorHost().postCompactSelection(
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
} else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
getCoprocessorHost().postCompactSelection(
thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
return null;
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
// Selected files; see if we have a compaction with some custom base request.

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
/**
@ -193,14 +194,28 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
CompactionContext requestCompaction() throws IOException;
/**
* @deprecated see requestCompaction(int, CompactionRequest, User)
*/
@Deprecated
CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
throws IOException;
CompactionContext requestCompaction(int priority, CompactionRequest baseRequest, User user)
throws IOException;
void cancelRequestedCompaction(CompactionContext compaction);
/**
* @deprecated see compact(CompactionContext, CompactionThroughputController, User)
*/
@Deprecated
List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException;
List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController, User user) throws IOException;
/**
* @return true if we should run a major compaction.
*/

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.security.User;
import com.google.common.base.Preconditions;
@ -102,7 +103,14 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
public List<Path> compact(CompactionThroughputController throughputController)
throws IOException {
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
return this.stripeRequest.execute(compactor, throughputController);
return this.stripeRequest.execute(compactor, throughputController, null);
}
@Override
public List<Path> compact(CompactionThroughputController throughputController, User user)
throws IOException {
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
return this.stripeRequest.execute(compactor, throughputController, user);
}
}
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.security.User;
/**
@ -71,6 +72,9 @@ public abstract class CompactionContext {
public abstract List<Path> compact(CompactionThroughputController throughputController)
throws IOException;
public abstract List<Path> compact(CompactionThroughputController throughputController, User user)
throws IOException;
public CompactionRequest getRequest() {
assert hasSelection();
return this.request;

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -224,9 +226,31 @@ public abstract class Compactor {
*/
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
}
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
User user) throws IOException {
if (store.getCoprocessorHost() == null) return null;
return store.getCoprocessorHost()
.preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
if (user == null) {
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
earliestPutTs, request);
} else {
try {
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
scanType, earliestPutTs, request);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
/**
@ -237,9 +261,24 @@ public abstract class Compactor {
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
ScanType scanType, InternalScanner scanner) throws IOException {
if (store.getCoprocessorHost() == null) return scanner;
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) return scanner;
if (user == null) {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
} else {
try {
return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
/**

View File

@ -34,10 +34,11 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.security.User;
/**
* Compact passed set of files. Create an instance and then call
* {@link #compact(CompactionRequest, CompactionThroughputController)}
* {@link #compact(CompactionRequest, CompactionThroughputController, User)}
*/
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor {
@ -51,7 +52,7 @@ public class DefaultCompactor extends Compactor {
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*/
public List<Path> compact(final CompactionRequest request,
CompactionThroughputController throughputController) throws IOException {
CompactionThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
@ -87,11 +88,11 @@ public class DefaultCompactor extends Compactor {
request.isRetainDeleteMarkers() ?
ScanType.COMPACT_RETAIN_DELETES :
ScanType.COMPACT_DROP_DELETES;
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
if (scanner == null) {
scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
}
scanner = postCreateCoprocScanner(request, scanType, scanner);
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return newFiles;
@ -172,7 +173,7 @@ public class DefaultCompactor extends Compactor {
/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest, CompactionThroughputController)};
* {@link #compact(CompactionRequest, CompactionThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for
* the generated {@link CompactionRequest}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
@ -184,6 +185,6 @@ public class DefaultCompactor extends Compactor {
throws IOException {
CompactionRequest cr = new CompactionRequest(filesToCompact);
cr.setIsMajor(isMajor, isMajor);
return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE);
return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -390,6 +391,10 @@ public class StripeCompactionPolicy extends CompactionPolicy {
protected CompactionRequest request;
protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException {
return execute(compactor, throughputController, null);
}
/**
* Executes the request against compactor (essentially, just calls correct overload of
* compact method), to simulate more dynamic dispatch.
@ -397,7 +402,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @return result of compact(...)
*/
public abstract List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException;
CompactionThroughputController throughputController, User user) throws IOException;
public StripeCompactionRequest(CompactionRequest request) {
this.request = request;
@ -449,9 +454,9 @@ public class StripeCompactionPolicy extends CompactionPolicy {
@Override
public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException {
CompactionThroughputController throughputController, User user) throws IOException {
return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
this.majorRangeToRow, throughputController);
this.majorRangeToRow, throughputController, user);
}
}
@ -500,9 +505,9 @@ public class StripeCompactionPolicy extends CompactionPolicy {
@Override
public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException {
CompactionThroughputController throughputController, User user) throws IOException {
return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController);
this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
}
/** Set major range of the compaction to the entire compaction range.

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -50,9 +51,16 @@ public class StripeCompactor extends Compactor {
super(conf, store);
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
throughputController, null);
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
CompactionThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@ -64,12 +72,19 @@ public class StripeCompactor extends Compactor {
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
targetBoundaries, majorRangeFromRow, majorRangeToRow);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
throughputController);
throughputController, user);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
majorRangeToRow, throughputController, null);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
CompactionThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + targetSize
+ " target file size, no more than " + targetCount + " files, in ["
@ -78,12 +93,12 @@ public class StripeCompactor extends Compactor {
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
targetCount, targetSize, left, right);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
throughputController);
throughputController, user);
}
private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
CompactionThroughputController throughputController, User user) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount);
@ -98,7 +113,7 @@ public class StripeCompactor extends Compactor {
try {
// Get scanner to use.
ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners);
scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user);
if (scanner == null) {
scanner = (majorRangeFromRow == null)
? createScanner(store, scanners,
@ -106,7 +121,7 @@ public class StripeCompactor extends Compactor {
: createScanner(store, scanners,
smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
}
scanner = postCreateCoprocScanner(request, coprocScanType, scanner);
scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<Path>();

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -129,6 +130,16 @@ public class TestIOFencing {
}
}
@Override
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException {
try {
return super.compact(compaction, store, throughputController, user);
} finally {
compactCount++;
}
}
public int countStoreFiles() {
int count = 0;
for (Store store : stores.values()) {

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -239,6 +240,14 @@ public class TestRegionObserverScannerOpenHook {
if (ret) compactionStateChangeLatch.countDown();
return ret;
}
@Override
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException {
boolean ret = super.compact(compaction, store, throughputController, user);
if (ret) compactionStateChangeLatch.countDown();
return ret;
}
}
/**

View File

@ -23,6 +23,7 @@ import static org.mockito.Mockito.*;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.security.User;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -56,6 +57,8 @@ public class StatefulStoreMockMaker {
Store store = mock(Store.class, name);
when(store.requestCompaction(
anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer());
when(store.requestCompaction(
anyInt(), isNull(CompactionRequest.class), any(User.class))).then(new SelectAnswer());
when(store.getCompactPriority()).then(new PriorityAnswer());
doAnswer(new CancelAnswer()).when(
store).cancelRequestedCompaction(any(CompactionContext.class));

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.client.Delete;
@ -367,6 +368,12 @@ public class TestCompaction {
@Override
public List<Path> compact(CompactionThroughputController throughputController)
throws IOException {
return compact(throughputController, null);
}
@Override
public List<Path> compact(CompactionThroughputController throughputController, User user)
throws IOException {
finishCompaction(this.selectedFiles);
return new ArrayList<Path>();
}
@ -419,6 +426,12 @@ public class TestCompaction {
@Override
public List<Path> compact(CompactionThroughputController throughputController)
throws IOException {
return compact(throughputController, null);
}
@Override
public List<Path> compact(CompactionThroughputController throughputController, User user)
throws IOException {
try {
isInCompact = true;
synchronized (this) {
@ -500,7 +513,7 @@ public class TestCompaction {
HRegion r = mock(HRegion.class);
when(
r.compact(any(CompactionContext.class), any(Store.class),
any(CompactionThroughputController.class))).then(new Answer<Boolean>() {
any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
public Boolean answer(InvocationOnMock invocation) throws Throwable {
invocation.getArgumentAt(0, CompactionContext.class).compact(
invocation.getArgumentAt(2, CompactionThroughputController.class));

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThrough
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
@ -76,7 +77,8 @@ public class TestStripeStoreEngine {
when(
mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
any(byte[].class), any(byte[].class), any(byte[].class),
any(CompactionThroughputController.class))).thenReturn(new ArrayList<Path>());
any(CompactionThroughputController.class), any(User.class)))
.thenReturn(new ArrayList<Path>());
// Produce 3 L0 files.
StoreFile sf = createFile();
@ -97,7 +99,7 @@ public class TestStripeStoreEngine {
compaction.compact(NoLimitCompactionThroughputController.INSTANCE);
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
NoLimitCompactionThroughputController.INSTANCE);
NoLimitCompactionThroughputController.INSTANCE, null);
}
private static StoreFile createFile() throws Exception {

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -215,10 +216,10 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
any(NoLimitCompactionThroughputController.class));
any(NoLimitCompactionThroughputController.class), any(User.class));
}
@Test
@ -468,7 +469,7 @@ public class TestStripeCompactionPolicy {
// All the Stripes are expired, so the Compactor will not create any Writers. We need to create
// an empty file to preserve metadata
StripeCompactor sc = createCompactor();
List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
assertEquals(1, paths.size());
}
@ -527,7 +528,7 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
@Override
public boolean matches(Object argument) {
@ -541,7 +542,7 @@ public class TestStripeCompactionPolicy {
}
}), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
any(NoLimitCompactionThroughputController.class));
any(NoLimitCompactionThroughputController.class), any(User.class));
}
/**
@ -562,12 +563,12 @@ public class TestStripeCompactionPolicy {
assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()),
count == null ? anyInt() : eq(count.intValue()),
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
any(NoLimitCompactionThroughputController.class));
any(NoLimitCompactionThroughputController.class), any(User.class));
}
/** Verify arbitrary flush. */