diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index a9e2fcaf925..6ce90bcff0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -352,7 +351,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi CompactionContext compaction = null; if (selectNow) { - compaction = selectCompaction(r, s, priority, request); + compaction = selectCompaction(r, s, priority, request, user); if (compaction == null) return null; // message logged inside } @@ -370,10 +369,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } private CompactionContext selectCompaction(final Region r, final Store s, - int priority, CompactionRequest request) throws IOException { - CompactionContext compaction = s.requestCompaction(priority, request); + int priority, CompactionRequest request, User user) throws IOException { + CompactionContext compaction = s.requestCompaction(priority, request, user); if (compaction == null) { - if(LOG.isDebugEnabled()) { + if(LOG.isDebugEnabled() && r.getRegionInfo() != null) { LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + " because compaction request was cancelled"); } @@ -484,7 +483,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi : ("Store = " + store.toString() + ", pri = " + queuedPriority); } - private void doCompaction() { + private void doCompaction(User user) { // Common case - system compaction without a file selection. Select now. if (this.compaction == null) { int oldPriority = this.queuedPriority; @@ -496,7 +495,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return; } try { - this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); + this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); } catch (IOException ex) { LOG.error("Compaction selection failed " + this, ex); server.checkFileSystem(); @@ -528,7 +527,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi // put it into region/store/etc. This is CST logic. long start = EnvironmentEdgeManager.currentTime(); boolean completed = - region.compact(compaction, store, compactionThroughputController); + region.compact(compaction, store, compactionThroughputController, user); long now = EnvironmentEdgeManager.currentTime(); LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); @@ -565,22 +564,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { return; } - if (this.user == null) doCompaction(); - else { - try { - user.getUGI().doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - doCompaction(); - return null; - } - }); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (IOException ioe) { - LOG.error("Encountered exception while compacting", ioe); - } - } + doCompaction(user); } private String formatStackTrace(Exception ex) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 51e1a2db6b4..da326e34178 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolic import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -121,7 +122,13 @@ public class DefaultStoreEngine extends StoreEngine< @Override public List compact(CompactionThroughputController throughputController) throws IOException { - return compactor.compact(request, throughputController); + return compact(throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { + return compactor.compact(request, throughputController, user); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b6cdd29a11c..34738de285b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -156,6 +156,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.ByteStringer; @@ -1715,7 +1716,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (controller == null) { controller = NoLimitCompactionThroughputController.INSTANCE; } - compact(compaction, s, controller); + compact(compaction, s, controller, null); } } } @@ -1730,7 +1731,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (Store s : getStores()) { CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE); + compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null); } } } @@ -1747,7 +1748,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Store s = getStore(family); CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - compact(compaction, s, throughputController); + compact(compaction, s, throughputController, null); } } @@ -1763,10 +1764,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * server does them sequentially and not in parallel. * * @param compaction Compaction details, obtained by requestCompaction() + * @param throughputController * @return whether the compaction completed */ public boolean compact(CompactionContext compaction, Store store, CompactionThroughputController throughputController) throws IOException { + return compact(compaction, store, throughputController, null); + } + + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController, User user) throws IOException { assert compaction != null && compaction.hasSelection(); assert !compaction.getRequest().getFiles().isEmpty(); if (this.closing.get() || this.closed.get()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 2ba8dc5b866..3c9e00a0032 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.security.Key; import java.security.KeyException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1208,6 +1209,12 @@ public class HStore implements Store { @Override public List compact(CompactionContext compaction, CompactionThroughputController throughputController) throws IOException { + return compact(compaction, throughputController, null); + } + + @Override + public List compact(CompactionContext compaction, + CompactionThroughputController throughputController, User user) throws IOException { assert compaction != null; List sfs = null; CompactionRequest cr = compaction.getRequest(); @@ -1232,7 +1239,7 @@ public class HStore implements Store { + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); // Commence the compaction. - List newFiles = compaction.compact(throughputController); + List newFiles = compaction.compact(throughputController, user); // TODO: get rid of this! if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { @@ -1247,7 +1254,7 @@ public class HStore implements Store { return sfs; } // Do the steps necessary to complete the compaction. - sfs = moveCompatedFilesIntoPlace(cr, newFiles); + sfs = moveCompatedFilesIntoPlace(cr, newFiles, user); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); if (cr.isMajor()) { @@ -1268,13 +1275,30 @@ public class HStore implements Store { } private List moveCompatedFilesIntoPlace( - CompactionRequest cr, List newFiles) throws IOException { + final CompactionRequest cr, List newFiles, User user) throws IOException { List sfs = new ArrayList(newFiles.size()); for (Path newFile : newFiles) { assert newFile != null; - StoreFile sf = moveFileIntoPlace(newFile); + final StoreFile sf = moveFileIntoPlace(newFile); if (this.getCoprocessorHost() != null) { - this.getCoprocessorHost().postCompact(this, sf, cr); + final Store thisStore = this; + if (user == null) { + getCoprocessorHost().postCompact(thisStore, sf, cr); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + getCoprocessorHost().postCompact(thisStore, sf, cr); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } assert sf != null; sfs.add(sf); @@ -1521,6 +1545,11 @@ public class HStore implements Store { @Override public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException { + return requestCompaction(priority, baseRequest, null); + } + @Override + public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest, + User user) throws IOException { // don't even select for compaction if writes are disabled if (!this.areWritesEnabled()) { return null; @@ -1529,16 +1558,34 @@ public class HStore implements Store { // Before we do compaction, try to get rid of unneeded files to simplify things. removeUnneededFiles(); - CompactionContext compaction = storeEngine.createCompaction(); + final CompactionContext compaction = storeEngine.createCompaction(); CompactionRequest request = null; this.lock.readLock().lock(); try { synchronized (filesCompacting) { + final Store thisStore = this; // First, see if coprocessor would want to override selection. if (this.getCoprocessorHost() != null) { - List candidatesForCoproc = compaction.preSelect(this.filesCompacting); - boolean override = this.getCoprocessorHost().preCompactSelection( - this, candidatesForCoproc, baseRequest); + final List candidatesForCoproc = compaction.preSelect(this.filesCompacting); + boolean override = false; + if (user == null) { + override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, + baseRequest); + } else { + try { + override = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc, + baseRequest); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } if (override) { // Coprocessor is overriding normal file selection. compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); @@ -1566,8 +1613,25 @@ public class HStore implements Store { } } if (this.getCoprocessorHost() != null) { - this.getCoprocessorHost().postCompactSelection( + if (user == null) { + this.getCoprocessorHost().postCompactSelection( this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + getCoprocessorHost().postCompactSelection( + thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } // Selected files; see if we have a compaction with some custom base request. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8d35a7d9da6..33e441614b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; /** @@ -193,14 +194,28 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf CompactionContext requestCompaction() throws IOException; + /** + * @deprecated see requestCompaction(int, CompactionRequest, User) + */ + @Deprecated CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException; + CompactionContext requestCompaction(int priority, CompactionRequest baseRequest, User user) + throws IOException; + void cancelRequestedCompaction(CompactionContext compaction); + /** + * @deprecated see compact(CompactionContext, CompactionThroughputController, User) + */ + @Deprecated List compact(CompactionContext compaction, CompactionThroughputController throughputController) throws IOException; + List compact(CompactionContext compaction, + CompactionThroughputController throughputController, User user) throws IOException; + /** * @return true if we should run a major compaction. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 26339a372c7..37072900d7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import com.google.common.base.Preconditions; @@ -102,7 +103,14 @@ public class StripeStoreEngine extends StoreEngine compact(CompactionThroughputController throughputController) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); - return this.stripeRequest.execute(compactor, throughputController); + return this.stripeRequest.execute(compactor, throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { + Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); + return this.stripeRequest.execute(compactor, throughputController, user); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index 1c89bf0a226..cb16966ef70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.security.User; /** @@ -71,6 +72,9 @@ public abstract class CompactionContext { public abstract List compact(CompactionThroughputController throughputController) throws IOException; + public abstract List compact(CompactionThroughputController throughputController, User user) + throws IOException; + public CompactionRequest getRequest() { assert hasSelection(); return this.request; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index eaccd0d2763..660ea9134aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -224,9 +226,31 @@ public abstract class Compactor { */ protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, ScanType scanType, long earliestPutTs, List scanners) throws IOException { + return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null); + } + + protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, + final ScanType scanType, final long earliestPutTs, final List scanners, + User user) throws IOException { if (store.getCoprocessorHost() == null) return null; - return store.getCoprocessorHost() - .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request); + if (user == null) { + return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, + earliestPutTs, request); + } else { + try { + return user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public InternalScanner run() throws Exception { + return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, + scanType, earliestPutTs, request); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } /** @@ -237,9 +261,24 @@ public abstract class Compactor { * @return Scanner scanner to use (usually the default); null if compaction should not proceed. */ protected InternalScanner postCreateCoprocScanner(final CompactionRequest request, - ScanType scanType, InternalScanner scanner) throws IOException { - if (store.getCoprocessorHost() == null) return scanner; - return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + final ScanType scanType, final InternalScanner scanner, User user) throws IOException { + if (store.getCoprocessorHost() == null) return scanner; + if (user == null) { + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + } else { + try { + return user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public InternalScanner run() throws Exception { + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index f26f4fe3dfd..069d2212b1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -34,10 +34,11 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.security.User; /** * Compact passed set of files. Create an instance and then call - * {@link #compact(CompactionRequest, CompactionThroughputController)} + * {@link #compact(CompactionRequest, CompactionThroughputController, User)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { @@ -51,7 +52,7 @@ public class DefaultCompactor extends Compactor { * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List compact(final CompactionRequest request, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -87,11 +88,11 @@ public class DefaultCompactor extends Compactor { request.isRetainDeleteMarkers() ? ScanType.COMPACT_RETAIN_DELETES : ScanType.COMPACT_DROP_DELETES; - scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); + scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user); if (scanner == null) { scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); } - scanner = postCreateCoprocScanner(request, scanType, scanner); + scanner = postCreateCoprocScanner(request, scanType, scanner, user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return newFiles; @@ -172,7 +173,7 @@ public class DefaultCompactor extends Compactor { /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest, CompactionThroughputController)}; + * {@link #compact(CompactionRequest, CompactionThroughputController, User)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) @@ -184,6 +185,6 @@ public class DefaultCompactor extends Compactor { throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); cr.setIsMajor(isMajor, isMajor); - return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE); + return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 0d49f095427..5f024b8af01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -390,6 +391,10 @@ public class StripeCompactionPolicy extends CompactionPolicy { protected CompactionRequest request; protected byte[] majorRangeFromRow = null, majorRangeToRow = null; + public List execute(StripeCompactor compactor, + CompactionThroughputController throughputController) throws IOException { + return execute(compactor, throughputController, null); + } /** * Executes the request against compactor (essentially, just calls correct overload of * compact method), to simulate more dynamic dispatch. @@ -397,7 +402,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { * @return result of compact(...) */ public abstract List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException; + CompactionThroughputController throughputController, User user) throws IOException; public StripeCompactionRequest(CompactionRequest request) { this.request = request; @@ -449,9 +454,9 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, - this.majorRangeToRow, throughputController); + this.majorRangeToRow, throughputController, user); } } @@ -500,9 +505,9 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, - this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController); + this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); } /** Set major range of the compaction to the entire compaction range. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 6814b8c4036..021965c3d86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; /** @@ -50,9 +51,16 @@ public class StripeCompactor extends Compactor { super(conf, store); } + public List compact(CompactionRequest request, List targetBoundaries, + byte[] majorRangeFromRow, byte[] majorRangeToRow, + CompactionThroughputController throughputController) throws IOException { + return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow, + throughputController, null); + } + public List compact(CompactionRequest request, List targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -64,12 +72,19 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( targetBoundaries, majorRangeFromRow, majorRangeToRow); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController); + throughputController, user); + } + + public List compact(CompactionRequest request, int targetCount, long targetSize, + byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, + CompactionThroughputController throughputController) throws IOException { + return compact(request, targetCount, targetSize, left, right, majorRangeFromRow, + majorRangeToRow, throughputController, null); } public List compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " + targetCount + " files, in [" @@ -78,12 +93,12 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( targetCount, targetSize, left, right); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController); + throughputController, user); } private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { final Collection filesToCompact = request.getFiles(); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -98,7 +113,7 @@ public class StripeCompactor extends Compactor { try { // Get scanner to use. ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES; - scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners); + scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user); if (scanner == null) { scanner = (majorRangeFromRow == null) ? createScanner(store, scanners, @@ -106,7 +121,7 @@ public class StripeCompactor extends Compactor { : createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow); } - scanner = postCreateCoprocScanner(request, coprocScanType, scanner); + scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index bb216b68d8f..c2a23d607b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; @@ -129,6 +130,16 @@ public class TestIOFencing { } } + @Override + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController, User user) throws IOException { + try { + return super.compact(compaction, store, throughputController, user); + } finally { + compactCount++; + } + } + public int countStoreFiles() { int count = 0; for (Store store : stores.values()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 00808bde896..44e06bd2847 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -239,6 +240,14 @@ public class TestRegionObserverScannerOpenHook { if (ret) compactionStateChangeLatch.countDown(); return ret; } + + @Override + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController, User user) throws IOException { + boolean ret = super.compact(compaction, store, throughputController, user); + if (ret) compactionStateChangeLatch.countDown(); + return ret; + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java index dd9c037f21f..052646235fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.*; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.security.User; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -56,6 +57,8 @@ public class StatefulStoreMockMaker { Store store = mock(Store.class, name); when(store.requestCompaction( anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer()); + when(store.requestCompaction( + anyInt(), isNull(CompactionRequest.class), any(User.class))).then(new SelectAnswer()); when(store.getCompactPriority()).then(new PriorityAnswer()); doAnswer(new CancelAnswer()).when( store).cancelRequestedCompaction(any(CompactionContext.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index a377325f2a5..04609720482 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.client.Delete; @@ -367,6 +368,12 @@ public class TestCompaction { @Override public List compact(CompactionThroughputController throughputController) throws IOException { + return compact(throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { finishCompaction(this.selectedFiles); return new ArrayList(); } @@ -419,6 +426,12 @@ public class TestCompaction { @Override public List compact(CompactionThroughputController throughputController) throws IOException { + return compact(throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { try { isInCompact = true; synchronized (this) { @@ -500,7 +513,7 @@ public class TestCompaction { HRegion r = mock(HRegion.class); when( r.compact(any(CompactionContext.class), any(Store.class), - any(CompactionThroughputController.class))).then(new Answer() { + any(CompactionThroughputController.class), any(User.class))).then(new Answer() { public Boolean answer(InvocationOnMock invocation) throws Throwable { invocation.getArgumentAt(0, CompactionContext.class).compact( invocation.getArgumentAt(2, CompactionThroughputController.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index d9e3ea3d326..1454aa84d23 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThrough import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; @@ -76,7 +77,8 @@ public class TestStripeStoreEngine { when( mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), - any(CompactionThroughputController.class))).thenReturn(new ArrayList()); + any(CompactionThroughputController.class), any(User.class))) + .thenReturn(new ArrayList()); // Produce 3 L0 files. StoreFile sf = createFile(); @@ -97,7 +99,7 @@ public class TestStripeStoreEngine { compaction.compact(NoLimitCompactionThroughputController.INSTANCE); verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitCompactionThroughputController.INSTANCE, null); } private static StoreFile createFile() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index a0579ce8e39..56e71e8b0b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -215,10 +216,10 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), any(User.class)); } @Test @@ -468,7 +469,7 @@ public class TestStripeCompactionPolicy { // All the Stripes are expired, so the Compactor will not create any Writers. We need to create // an empty file to preserve metadata StripeCompactor sc = createCompactor(); - List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); assertEquals(1, paths.size()); } @@ -527,7 +528,7 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher>() { @Override public boolean matches(Object argument) { @@ -541,7 +542,7 @@ public class TestStripeCompactionPolicy { } }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), any(User.class)); } /** @@ -562,12 +563,12 @@ public class TestStripeCompactionPolicy { assertTrue(!needsCompaction || policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); verify(sc, times(1)).compact(eq(scr.getRequest()), count == null ? anyInt() : eq(count.intValue()), size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), any(User.class)); } /** Verify arbitrary flush. */