HBASE-18815 We need to pass something like CompactionRequest in CP to give user some information about the compaction

CompactionRequest was removed from CP in HBASE-18453, this change reintroduces
CompatcionRequest to CP as a read-only interface called CompactionRequest.
The CompactionRequest class is renamed to CompactionRequestImpl.

Additionally, this change removes selectionTimeInNanos from CompactionRequest and
uses selectionTime as a replacement. This means that CompactionRequest:toString
is modified and compare as well.

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Peter Somogyi 2017-09-29 15:25:25 -07:00 committed by Michael Stack
parent 50265395d1
commit 0af61dce65
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
43 changed files with 382 additions and 262 deletions

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -228,7 +229,8 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) { if (scanInfo == null) {
// take default action // take default action

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -187,9 +188,11 @@ public interface RegionObserver {
* @param store the store where compaction is being requested * @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction * @param candidates the store files currently available for compaction
* @param tracker tracker used to track the life cycle of a compaction * @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/ */
default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {} List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {}
/** /**
* Called after the {@link StoreFile}s to compact have been selected from the available * Called after the {@link StoreFile}s to compact have been selected from the available
@ -198,9 +201,11 @@ public interface RegionObserver {
* @param store the store being compacted * @param store the store being compacted
* @param selected the store files selected to compact * @param selected the store files selected to compact
* @param tracker tracker used to track the life cycle of a compaction * @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/ */
default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker) {} ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
CompactionRequest request) {}
/** /**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new * Called prior to writing the {@link StoreFile}s selected for compaction into a new
@ -221,11 +226,13 @@ public interface RegionObserver {
* @param scanner the scanner over existing data used in the store file rewriting * @param scanner the scanner over existing data used in the store file rewriting
* @param scanType type of Scan * @param scanType type of Scan
* @param tracker tracker used to track the life cycle of a compaction * @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
* @return the scanner to use during compaction. Should not be {@code null} unless the * @return the scanner to use during compaction. Should not be {@code null} unless the
* implementation is writing new store files on its own. * implementation is writing new store files on its own.
*/ */
default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request)
throws IOException { throws IOException {
return scanner; return scanner;
} }
@ -245,13 +252,15 @@ public interface RegionObserver {
* files * files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param tracker used to track the life cycle of a compaction * @param tracker used to track the life cycle of a compaction
* @param request the requested compaction
* @param readPoint the readpoint to create scanner * @param readPoint the readpoint to create scanner
* @return the scanner to use during compaction. {@code null} if the default implementation is to * @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used. * be used.
*/ */
default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
return s; return s;
} }
@ -261,9 +270,11 @@ public interface RegionObserver {
* @param store the store being compacted * @param store the store being compacted
* @param resultFile the new store file written out during compaction * @param resultFile the new store file written out during compaction
* @param tracker used to track the life cycle of a compaction * @param tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/ */
default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {} StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
throws IOException {}
/** /**
* Called before the region is reported as closed to the master. * Called before the region is reported as closed to the master.
@ -320,7 +331,7 @@ public interface RegionObserver {
* coprocessors * coprocessors
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param get the Get request * @param get the Get request
* @param exists * @param exists the result returned by the region server
* @return the value to return to the client if bypassing default processing * @return the value to return to the client if bypassing default processing
*/ */
default boolean preExists(ObserverContext<RegionCoprocessorEnvironment> c, Get get, default boolean preExists(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
@ -799,8 +810,8 @@ public interface RegionObserver {
* <p> * <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
* and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, long)} to override scanners created for flushes * InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)} to override scanners
* or compactions, resp. * created for flushes or compactions, resp.
* <p> * <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors. * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
@ -41,17 +40,17 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Compact passed set of files in the mob-enabled column family. * Compact passed set of files in the mob-enabled column family.
@ -66,7 +65,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
@Override @Override
public ScanType getScanType(CompactionRequest request) { public ScanType getScanType(CompactionRequestImpl request) {
// retain the delete markers until they are expired. // retain the delete markers until they are expired.
return ScanType.COMPACT_RETAIN_DELETES; return ScanType.COMPACT_RETAIN_DELETES;
} }
@ -105,7 +104,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
} }
@Override @Override
public List<Path> compact(CompactionRequest request, ThroughputController throughputController, public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController,
User user) throws IOException { User user) throws IOException {
return compact(request, scannerFactory, writerFactory, throughputController, user); return compact(request, scannerFactory, writerFactory, throughputController, user);
} }

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -393,7 +393,7 @@ public class CompactSplit implements PropagatingConfigurationObserver {
private static final Comparator<Runnable> COMPARATOR = private static final Comparator<Runnable> COMPARATOR =
new Comparator<Runnable>() { new Comparator<Runnable>() {
private int compare(CompactionRequest r1, CompactionRequest r2) { private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
if (r1 == r2) { if (r1 == r2) {
return 0; //they are the same request return 0; //they are the same request
} }
@ -402,7 +402,7 @@ public class CompactSplit implements PropagatingConfigurationObserver {
if (cmp != 0) { if (cmp != 0) {
return cmp; return cmp;
} }
cmp = Long.compare(r1.getSelectionNanoTime(), r2.getSelectionNanoTime()); cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
if (cmp != 0) { if (cmp != 0) {
return cmp; return cmp;
} }

View File

@ -23,9 +23,9 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
@ -81,7 +81,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
} }
@Override @Override
public void forceSelect(CompactionRequest request) { public void forceSelect(CompactionRequestImpl request) {
if (!(request instanceof DateTieredCompactionRequest)) { if (!(request instanceof DateTieredCompactionRequest)) {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: " throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
+ request.getClass().getCanonicalName()); + request.getClass().getCanonicalName());

View File

@ -82,7 +82,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
@ -1349,9 +1349,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
assert compaction != null; assert compaction != null;
List<HStoreFile> sfs = null; List<HStoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest(); CompactionRequestImpl cr = compaction.getRequest();
try { try {
// Do all sanity checking in here if we have a valid CompactionRequest // Do all sanity checking in here if we have a valid CompactionRequestImpl
// because we need to clean up after it on the way out in a finally // because we need to clean up after it on the way out in a finally
// block below // block below
long compactionStartTime = EnvironmentEdgeManager.currentTime(); long compactionStartTime = EnvironmentEdgeManager.currentTime();
@ -1387,7 +1387,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
return sfs; return sfs;
} }
// Do the steps necessary to complete the compaction. // Do the steps necessary to complete the compaction.
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user); sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs); writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) { if (cr.isMajor()) {
@ -1417,14 +1417,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
} }
} }
private List<HStoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles, private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles,
User user) throws IOException { User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) { for (Path newFile : newFiles) {
assert newFile != null; assert newFile != null;
HStoreFile sf = moveFileIntoPlace(newFile); HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user); getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
} }
assert sf != null; assert sf != null;
sfs.add(sf); sfs.add(sf);
@ -1483,7 +1483,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* @param compactionStartTime Start time. * @param compactionStartTime Start time.
*/ */
private void logCompactionEndMessage( private void logCompactionEndMessage(
CompactionRequest cr, List<HStoreFile> sfs, long now, long compactionStartTime) { CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) {
StringBuilder message = new StringBuilder( StringBuilder message = new StringBuilder(
"Completed" + (cr.isMajor() ? " major" : "") + " compaction of " "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
+ cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
@ -1625,7 +1625,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
// Move the compaction into place. // Move the compaction into place.
HStoreFile sf = moveFileIntoPlace(newFile); HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompact(this, sf, null, null); this.getCoprocessorHost().postCompact(this, sf, null, null, null);
} }
replaceStoreFiles(filesToCompact, Collections.singletonList(sf)); replaceStoreFiles(filesToCompact, Collections.singletonList(sf));
completeCompaction(filesToCompact); completeCompaction(filesToCompact);
@ -1674,7 +1674,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
removeUnneededFiles(); removeUnneededFiles();
final CompactionContext compaction = storeEngine.createCompaction(); final CompactionContext compaction = storeEngine.createCompaction();
CompactionRequest request = null; CompactionRequestImpl request = null;
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
synchronized (filesCompacting) { synchronized (filesCompacting) {
@ -1682,11 +1682,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = false; boolean override = false;
//TODO: is it correct way to get CompactionRequest?
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
tracker, user); tracker, null, user);
if (override) { if (override) {
// Coprocessor is overriding normal file selection. // Coprocessor is overriding normal file selection.
compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
} }
} }
@ -1712,7 +1713,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
} }
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompactSelection( this.getCoprocessorHost().postCompactSelection(
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user); this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
compaction.getRequest(), user);
} }
// Finally, we have the resulting files list. Check if we have any files at all. // Finally, we have the resulting files list. Check if we have any files at all.
request = compaction.getRequest(); request = compaction.getRequest();
@ -1790,7 +1792,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
finishCompactionRequest(compaction.getRequest()); finishCompactionRequest(compaction.getRequest());
} }
private void finishCompactionRequest(CompactionRequest cr) { private void finishCompactionRequest(CompactionRequestImpl cr) {
this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize()); this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
if (cr.isOffPeak()) { if (cr.isOffPeak()) {
offPeakCompactionTracker.set(false); offPeakCompactionTracker.set(false);

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
@ -544,11 +545,12 @@ public class RegionCoprocessorHost
/** /**
* See * See
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, long)} * InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)}
*/ */
public InternalScanner preCompactScannerOpen(final HStore store, public InternalScanner preCompactScannerOpen(final HStore store,
final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
final CompactionLifeCycleTracker tracker, final User user, final long readPoint) final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user,
final long readPoint)
throws IOException { throws IOException {
return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
new ObserverOperationWithResult<RegionObserver, InternalScanner>( new ObserverOperationWithResult<RegionObserver, InternalScanner>(
@ -556,7 +558,7 @@ public class RegionCoprocessorHost
@Override @Override
public InternalScanner call(RegionObserver observer) throws IOException { public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preCompactScannerOpen(this, store, scanners, scanType, return observer.preCompactScannerOpen(this, store, scanners, scanType,
earliestPutTs, getResult(), tracker, readPoint); earliestPutTs, getResult(), tracker, request, readPoint);
} }
}); });
} }
@ -567,15 +569,18 @@ public class RegionCoprocessorHost
* @param store The store where compaction is being requested * @param store The store where compaction is being requested
* @param candidates The currently available store files * @param candidates The currently available store files
* @param tracker used to track the life cycle of a compaction * @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
* @return If {@code true}, skip the normal selection process and use the current list * @return If {@code true}, skip the normal selection process and use the current list
* @throws IOException * @throws IOException
*/ */
public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
final CompactionLifeCycleTracker tracker, final User user) throws IOException { final CompactionLifeCycleTracker tracker, final CompactionRequest request,
final User user) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override @Override
public void call(RegionObserver observer) throws IOException { public void call(RegionObserver observer) throws IOException {
observer.preCompactSelection(this, store, candidates, tracker); observer.preCompactSelection(this, store, candidates, tracker, request);
} }
}); });
} }
@ -586,13 +591,16 @@ public class RegionCoprocessorHost
* @param store The store where compaction is being requested * @param store The store where compaction is being requested
* @param selected The store files selected to compact * @param selected The store files selected to compact
* @param tracker used to track the life cycle of a compaction * @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
*/ */
public void postCompactSelection(final HStore store, final ImmutableList<HStoreFile> selected, public void postCompactSelection(final HStore store, final ImmutableList<HStoreFile> selected,
final CompactionLifeCycleTracker tracker, final User user) throws IOException { final CompactionLifeCycleTracker tracker, final CompactionRequest request,
final User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override @Override
public void call(RegionObserver observer) throws IOException { public void call(RegionObserver observer) throws IOException {
observer.postCompactSelection(this, store, selected, tracker); observer.postCompactSelection(this, store, selected, tracker, request);
} }
}); });
} }
@ -603,17 +611,19 @@ public class RegionCoprocessorHost
* @param scanner the scanner used to read store data during compaction * @param scanner the scanner used to read store data during compaction
* @param scanType type of Scan * @param scanType type of Scan
* @param tracker used to track the life cycle of a compaction * @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
* @throws IOException * @throws IOException
*/ */
public InternalScanner preCompact(final HStore store, final InternalScanner scanner, public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
final ScanType scanType, final CompactionLifeCycleTracker tracker, final User user) final ScanType scanType, final CompactionLifeCycleTracker tracker,
throws IOException { final CompactionRequest request, final User user) throws IOException {
return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null : return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null :
new ObserverOperationWithResult<RegionObserver, InternalScanner>( new ObserverOperationWithResult<RegionObserver, InternalScanner>(
regionObserverGetter, user) { regionObserverGetter, user) {
@Override @Override
public InternalScanner call(RegionObserver observer) throws IOException { public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preCompact(this, store, getResult(), scanType, tracker); return observer.preCompact(this, store, getResult(), scanType, tracker, request);
} }
}); });
} }
@ -623,14 +633,17 @@ public class RegionCoprocessorHost
* @param store the store being compacted * @param store the store being compacted
* @param resultFile the new store file written during compaction * @param resultFile the new store file written during compaction
* @param tracker used to track the life cycle of a compaction * @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
* @throws IOException * @throws IOException
*/ */
public void postCompact(final HStore store, final HStoreFile resultFile, public void postCompact(final HStore store, final HStoreFile resultFile,
final CompactionLifeCycleTracker tracker, final User user) throws IOException { final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override @Override
public void call(RegionObserver observer) throws IOException { public void call(RegionObserver observer) throws IOException {
observer.postCompact(this, store, resultFile, tracker); observer.postCompact(this, store, resultFile, tracker, request);
} }
}); });
} }

View File

@ -27,9 +27,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -84,12 +84,12 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
this.stripeRequest = compactionPolicy.selectCompaction( this.stripeRequest = compactionPolicy.selectCompaction(
storeFileManager, filesCompacting, mayUseOffPeak); storeFileManager, filesCompacting, mayUseOffPeak);
this.request = (this.stripeRequest == null) this.request = (this.stripeRequest == null)
? new CompactionRequest(new ArrayList<>()) : this.stripeRequest.getRequest(); ? new CompactionRequestImpl(new ArrayList<>()) : this.stripeRequest.getRequest();
return this.stripeRequest != null; return this.stripeRequest != null;
} }
@Override @Override
public void forceSelect(CompactionRequest request) { public void forceSelect(CompactionRequestImpl request) {
super.forceSelect(request); super.forceSelect(request);
if (this.stripeRequest != null) { if (this.stripeRequest != null) {
this.stripeRequest.setRequest(this.request); this.stripeRequest.setRequest(this.request);

View File

@ -36,7 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class CompactionContext { public abstract class CompactionContext {
protected CompactionRequest request = null; protected CompactionRequestImpl request = null;
/** /**
* Called before coprocessor preCompactSelection and should filter the candidates * Called before coprocessor preCompactSelection and should filter the candidates
@ -61,14 +61,14 @@ public abstract class CompactionContext {
* Forces external selection to be applied for this compaction. * Forces external selection to be applied for this compaction.
* @param request The pre-cooked request with selection and other settings. * @param request The pre-cooked request with selection and other settings.
*/ */
public void forceSelect(CompactionRequest request) { public void forceSelect(CompactionRequestImpl request) {
this.request = request; this.request = request;
} }
public abstract List<Path> compact(ThroughputController throughputController, User user) public abstract List<Path> compact(ThroughputController throughputController, User user)
throws IOException; throws IOException;
public CompactionRequest getRequest() { public CompactionRequestImpl getRequest() {
assert hasSelection(); assert hasSelection();
return this.request; return this.request;
} }

View File

@ -62,8 +62,6 @@ public abstract class CompactionPolicy {
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo); this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
} }
/** /**
* @return The current compaction configuration settings. * @return The current compaction configuration settings.
*/ */

View File

@ -18,142 +18,51 @@
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import java.util.Collection;
/** /**
* This class holds all logical details necessary to run a compaction. * Coprocessors use this interface to get details about compaction.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public class CompactionRequest { public interface CompactionRequest {
// was this compaction promoted to an off-peak
private boolean isOffPeak = false;
private enum DisplayCompactionType { MINOR, ALL_FILES, MAJOR }
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
private int priority = NO_PRIORITY;
private Collection<HStoreFile> filesToCompact;
// CompactRequest object creation time.
private long selectionTime;
// System time used to compare objects in FIFO order. TODO: maybe use selectionTime?
private long timeInNanos;
private String regionName = "";
private String storeName = "";
private long totalSize = -1L;
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
public CompactionRequest(Collection<HStoreFile> files) {
this.selectionTime = EnvironmentEdgeManager.currentTime();
this.timeInNanos = System.nanoTime();
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize();
}
public void updateFiles(Collection<HStoreFile> files) {
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize();
}
public Collection<HStoreFile> getFiles() {
return this.filesToCompact;
}
/** /**
* Sets the region/store name, for logging. * @return unmodifiable collection of StoreFiles in compaction
*/ */
public void setDescription(String regionName, String storeName) { Collection<? extends StoreFile> getFiles();
this.regionName = regionName;
this.storeName = storeName;
}
/** Gets the total size of all StoreFiles in compaction */
public long getSize() {
return totalSize;
}
public boolean isAllFiles() {
return this.isMajor == DisplayCompactionType.MAJOR
|| this.isMajor == DisplayCompactionType.ALL_FILES;
}
public boolean isMajor() {
return this.isMajor == DisplayCompactionType.MAJOR;
}
/** Gets the priority for the request */
public int getPriority() {
return priority;
}
/** Sets the priority for the request */
public void setPriority(int p) {
this.priority = p;
}
public boolean isOffPeak() {
return this.isOffPeak;
}
public void setOffPeak(boolean value) {
this.isOffPeak = value;
}
public long getSelectionTime() {
return this.selectionTime;
}
public long getSelectionNanoTime() {
return this.timeInNanos;
}
/** /**
* Specify if this compaction should be a major compaction based on the state of the store * @return total size of all StoreFiles in compaction
* @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
* compaction
*/ */
public void setIsMajor(boolean isMajor, boolean isAllFiles) { long getSize();
assert isAllFiles || !isMajor;
this.isMajor = !isAllFiles ? DisplayCompactionType.MINOR
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
}
public void setTracker(CompactionLifeCycleTracker tracker) {
this.tracker = tracker;
}
public CompactionLifeCycleTracker getTracker() {
return tracker;
}
@Override
public String toString() {
String fsList = filesToCompact.stream().filter(f -> f.getReader() != null)
.map(f -> TraditionalBinaryPrefix.long2String(f.getReader().length(), "", 1))
.collect(Collectors.joining(", "));
return "regionName=" + regionName + ", storeName=" + storeName + ", fileCount=" +
this.getFiles().size() + ", fileSize=" +
TraditionalBinaryPrefix.long2String(totalSize, "", 1) +
((fsList.isEmpty()) ? "" : " (" + fsList + ")") + ", priority=" + priority + ", time=" +
timeInNanos;
}
/** /**
* Recalculate the size of the compaction based on current files. * @return <code>true</code> if major compaction or all files are compacted
* @param files files that should be included in the compaction
*/ */
private void recalculateSize() { boolean isAllFiles();
this.totalSize = filesToCompact.stream().map(HStoreFile::getReader)
.mapToLong(r -> r != null ? r.length() : 0L).sum(); /**
} * @return <code>true</code> if major compaction
*/
boolean isMajor();
/**
* @return priority of compaction request
*/
int getPriority();
/**
* @return <code>true</code> if compaction is Off-peak
*/
boolean isOffPeak();
/**
* @return compaction request creation time in milliseconds
*/
long getSelectionTime();
} }

View File

@ -0,0 +1,159 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
/**
* This class holds all logical details necessary to run a compaction.
*/
@InterfaceAudience.Private
public class CompactionRequestImpl implements CompactionRequest {
// was this compaction promoted to an off-peak
private boolean isOffPeak = false;
private enum DisplayCompactionType { MINOR, ALL_FILES, MAJOR }
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
private int priority = NO_PRIORITY;
private Collection<HStoreFile> filesToCompact;
// CompactRequest object creation time.
private long selectionTime;
private String regionName = "";
private String storeName = "";
private long totalSize = -1L;
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
public CompactionRequestImpl(Collection<HStoreFile> files) {
this.selectionTime = EnvironmentEdgeManager.currentTime();
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize();
}
public void updateFiles(Collection<HStoreFile> files) {
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize();
}
@Override
public Collection<HStoreFile> getFiles() {
return Collections.unmodifiableCollection(this.filesToCompact);
}
/**
* Sets the region/store name, for logging.
*/
public void setDescription(String regionName, String storeName) {
this.regionName = regionName;
this.storeName = storeName;
}
/** Gets the total size of all StoreFiles in compaction */
@Override
public long getSize() {
return totalSize;
}
@Override
public boolean isAllFiles() {
return this.isMajor == DisplayCompactionType.MAJOR
|| this.isMajor == DisplayCompactionType.ALL_FILES;
}
@Override
public boolean isMajor() {
return this.isMajor == DisplayCompactionType.MAJOR;
}
/** Gets the priority for the request */
@Override
public int getPriority() {
return priority;
}
/** Sets the priority for the request */
public void setPriority(int p) {
this.priority = p;
}
@Override
public boolean isOffPeak() {
return this.isOffPeak;
}
public void setOffPeak(boolean value) {
this.isOffPeak = value;
}
@Override
public long getSelectionTime() {
return this.selectionTime;
}
/**
* Specify if this compaction should be a major compaction based on the state of the store
* @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
* compaction
*/
public void setIsMajor(boolean isMajor, boolean isAllFiles) {
assert isAllFiles || !isMajor;
this.isMajor = !isAllFiles ? DisplayCompactionType.MINOR
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
}
public void setTracker(CompactionLifeCycleTracker tracker) {
this.tracker = tracker;
}
public CompactionLifeCycleTracker getTracker() {
return tracker;
}
@Override
public String toString() {
String fsList = filesToCompact.stream().filter(f -> f.getReader() != null)
.map(f -> TraditionalBinaryPrefix.long2String(f.getReader().length(), "", 1))
.collect(Collectors.joining(", "));
return "regionName=" + regionName + ", storeName=" + storeName + ", fileCount=" +
this.getFiles().size() + ", fileSize=" +
TraditionalBinaryPrefix.long2String(totalSize, "", 1) +
((fsList.isEmpty()) ? "" : " (" + fsList + ")") + ", priority=" + priority + ", time=" +
selectionTime;
}
/**
* Recalculate the size of the compaction based on current files.
*/
private void recalculateSize() {
this.totalSize = filesToCompact.stream().map(HStoreFile::getReader)
.mapToLong(r -> r != null ? r.length() : 0L).sum();
}
}

View File

@ -229,7 +229,7 @@ public abstract class Compactor<T extends CellSink> {
protected interface InternalScannerFactory { protected interface InternalScannerFactory {
ScanType getScanType(CompactionRequest request); ScanType getScanType(CompactionRequestImpl request);
InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType, InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException; FileDetails fd, long smallestReadPoint) throws IOException;
@ -238,7 +238,7 @@ public abstract class Compactor<T extends CellSink> {
protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() { protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
@Override @Override
public ScanType getScanType(CompactionRequest request) { public ScanType getScanType(CompactionRequestImpl request) {
return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
: ScanType.COMPACT_RETAIN_DELETES; : ScanType.COMPACT_RETAIN_DELETES;
} }
@ -267,7 +267,7 @@ public abstract class Compactor<T extends CellSink> {
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind); /* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
} }
protected List<Path> compact(final CompactionRequest request, protected List<Path> compact(final CompactionRequestImpl request,
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory, InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
@ -325,8 +325,8 @@ public abstract class Compactor<T extends CellSink> {
return commitWriter(writer, fd, request); return commitWriter(writer, fd, request);
} }
protected abstract List<Path> commitWriter(T writer, FileDetails fd, CompactionRequest request) protected abstract List<Path> commitWriter(T writer, FileDetails fd,
throws IOException; CompactionRequestImpl request) throws IOException;
protected abstract void abortWriter(T writer) throws IOException; protected abstract void abortWriter(T writer) throws IOException;
@ -340,14 +340,14 @@ public abstract class Compactor<T extends CellSink> {
* @param readPoint the read point to help create scanner by Coprocessor if required. * @param readPoint the read point to help create scanner by Coprocessor if required.
* @return Scanner override by coprocessor; null if not overriding. * @return Scanner override by coprocessor; null if not overriding.
*/ */
protected InternalScanner preCreateCoprocScanner(CompactionRequest request, ScanType scanType, protected InternalScanner preCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
long earliestPutTs, List<StoreFileScanner> scanners, User user, long readPoint) long earliestPutTs, List<StoreFileScanner> scanners, User user, long readPoint)
throws IOException { throws IOException {
if (store.getCoprocessorHost() == null) { if (store.getCoprocessorHost() == null) {
return null; return null;
} }
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
earliestPutTs, request.getTracker(), user, readPoint); earliestPutTs, request.getTracker(), request, user, readPoint);
} }
/** /**
@ -357,13 +357,13 @@ public abstract class Compactor<T extends CellSink> {
* @param scanner The default scanner created for compaction. * @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed. * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/ */
protected InternalScanner postCreateCoprocScanner(CompactionRequest request, ScanType scanType, protected InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
InternalScanner scanner, User user) throws IOException { InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) { if (store.getCoprocessorHost() == null) {
return scanner; return scanner;
} }
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
user); request, user);
} }
/** /**

View File

@ -186,9 +186,9 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
} }
@Override @Override
protected CompactionRequest createCompactionRequest(ArrayList<HStoreFile> candidateSelection, protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection,
boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection) CompactionRequestImpl result = tryingMajor ? selectMajorCompaction(candidateSelection)
: selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck); : selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Generated compaction request: " + result); LOG.debug("Generated compaction request: " + result);
@ -196,7 +196,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
return result; return result;
} }
public CompactionRequest selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) { public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
return new DateTieredCompactionRequest(candidateSelection, return new DateTieredCompactionRequest(candidateSelection,
this.getCompactBoundariesForMajor(candidateSelection, now)); this.getCompactBoundariesForMajor(candidateSelection, now));
@ -210,7 +210,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
* by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order * by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
* data into the same compaction windows, guaranteeing contiguous compaction based on sequence id. * data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
*/ */
public CompactionRequest selectMinorCompaction(ArrayList<HStoreFile> candidateSelection, public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection,
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now); long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now);
@ -261,7 +261,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
} }
} }
// A non-null file list is expected by HStore // A non-null file list is expected by HStore
return new CompactionRequest(Collections.emptyList()); return new CompactionRequestImpl(Collections.emptyList());
} }
private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles, private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS",
justification="It is intended to use the same equal method as superclass") justification="It is intended to use the same equal method as superclass")
public class DateTieredCompactionRequest extends CompactionRequest { public class DateTieredCompactionRequest extends CompactionRequestImpl {
private List<Long> boundaries; private List<Long> boundaries;
public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList) { public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList) {

View File

@ -45,7 +45,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
super(conf, store); super(conf, store);
} }
private boolean needEmptyFile(CompactionRequest request) { private boolean needEmptyFile(CompactionRequestImpl request) {
// if we are going to compact the last N files, then we need to emit an empty file to retain the // if we are going to compact the last N files, then we need to emit an empty file to retain the
// maxSeqId if we haven't written out anything. // maxSeqId if we haven't written out anything.
OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles()); OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles());
@ -54,7 +54,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
maxSeqId.getAsLong() == storeMaxSeqId.getAsLong(); maxSeqId.getAsLong() == storeMaxSeqId.getAsLong();
} }
public List<Path> compact(final CompactionRequest request, final List<Long> lowerBoundaries, public List<Path> compact(final CompactionRequestImpl request, final List<Long> lowerBoundaries,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + lowerBoundaries.size() LOG.debug("Executing compaction with " + lowerBoundaries.size()
@ -77,7 +77,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
@Override @Override
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException { CompactionRequestImpl request) throws IOException {
return writer.commitWriters(fd.maxSeqId, request.isAllFiles()); return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
} }
} }

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/** /**
* Compact passed set of files. Create an instance and then call * Compact passed set of files. Create an instance and then call
* {@link #compact(CompactionRequest, ThroughputController, User)} * {@link #compact(CompactionRequestImpl, ThroughputController, User)}
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DefaultCompactor extends Compactor<StoreFileWriter> { public class DefaultCompactor extends Compactor<StoreFileWriter> {
@ -61,16 +61,16 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
/** /**
* Do a minor/major compaction on an explicit set of storefiles from a Store. * Do a minor/major compaction on an explicit set of storefiles from a Store.
*/ */
public List<Path> compact(final CompactionRequest request, public List<Path> compact(final CompactionRequestImpl request,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
return compact(request, defaultScannerFactory, writerFactory, throughputController, user); return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
} }
/** /**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to * Compact a list of files for testing. Creates a fake {@link CompactionRequestImpl} to pass to
* {@link #compact(CompactionRequest, ThroughputController, User)}; * {@link #compact(CompactionRequestImpl, ThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for the * @param filesToCompact the files to compact. These are used as the compactionSelection for the
* generated {@link CompactionRequest}. * generated {@link CompactionRequestImpl}.
* @param isMajor true to major compact (prune all deletes, max versions, etc) * @param isMajor true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and nothing \ * @return Product of compaction or an empty list if all cells expired or deleted and nothing \
* made it through the compaction. * made it through the compaction.
@ -78,14 +78,14 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
*/ */
public List<Path> compactForTesting(Collection<HStoreFile> filesToCompact, boolean isMajor) public List<Path> compactForTesting(Collection<HStoreFile> filesToCompact, boolean isMajor)
throws IOException { throws IOException {
CompactionRequest cr = new CompactionRequest(filesToCompact); CompactionRequestImpl cr = new CompactionRequestImpl(filesToCompact);
cr.setIsMajor(isMajor, isMajor); cr.setIsMajor(isMajor, isMajor);
return compact(cr, NoLimitThroughputController.INSTANCE, null); return compact(cr, NoLimitThroughputController.INSTANCE, null);
} }
@Override @Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath()); List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.close(); writer.close();

View File

@ -55,7 +55,7 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
} }
@Override @Override
public CompactionRequest selectCompaction(Collection<HStoreFile> candidateFiles, public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles,
List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
boolean forceMajor) throws IOException { boolean forceMajor) throws IOException {
if(forceMajor){ if(forceMajor){
@ -70,7 +70,7 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
// Nothing to compact // Nothing to compact
Collection<HStoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting); Collection<HStoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
CompactionRequest result = new CompactionRequest(toCompact); CompactionRequestImpl result = new CompactionRequestImpl(toCompact);
return result; return result;
} }
@ -123,7 +123,7 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
if (maxTtl == Long.MAX_VALUE if (maxTtl == Long.MAX_VALUE
|| (currentTime - maxTtl < maxTs)){ || (currentTime - maxTtl < maxTs)){
continue; continue;
} else if(filesCompacting == null || filesCompacting.contains(sf) == false){ } else if(filesCompacting == null || !filesCompacting.contains(sf)){
expiredStores.add(sf); expiredStores.add(sf);
} }
} }

View File

@ -113,7 +113,7 @@ public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
} }
@Override @Override
protected CompactionRequest createCompactionRequest(ArrayList<HStoreFile> protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile>
candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck)
throws IOException { throws IOException {
if (!tryingMajor) { if (!tryingMajor) {
@ -122,7 +122,7 @@ public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
candidateSelection = checkMinFilesCriteria(candidateSelection, candidateSelection = checkMinFilesCriteria(candidateSelection,
comConf.getMinFilesToCompact()); comConf.getMinFilesToCompact());
} }
return new CompactionRequest(candidateSelection); return new CompactionRequestImpl(candidateSelection);
} }
/** /**

View File

@ -53,7 +53,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
* on seqId for data consistency. * on seqId for data consistency.
* @return subset copy of candidate list that meets compaction criteria * @return subset copy of candidate list that meets compaction criteria
*/ */
public CompactionRequest selectCompaction(Collection<HStoreFile> candidateFiles, public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles,
List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
boolean forceMajor) throws IOException { boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters // Preliminary compaction subject to filters
@ -85,7 +85,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
// Or, if there are any references among the candidates. // Or, if there are any references among the candidates.
boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection); boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
CompactionRequest result = createCompactionRequest(candidateSelection, CompactionRequestImpl result = createCompactionRequest(candidateSelection,
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck); isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles()); ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
@ -99,7 +99,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
return result; return result;
} }
protected abstract CompactionRequest createCompactionRequest( protected abstract CompactionRequestImpl createCompactionRequest(
ArrayList<HStoreFile> candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, ArrayList<HStoreFile> candidateSelection, boolean tryingMajor, boolean mayUseOffPeak,
boolean mayBeStuck) throws IOException; boolean mayBeStuck) throws IOException;

View File

@ -74,7 +74,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
} }
public StripeCompactionRequest createEmptyRequest( public StripeCompactionRequest createEmptyRequest(
StripeInformationProvider si, CompactionRequest request) { StripeInformationProvider si, CompactionRequestImpl request) {
// Treat as L0-ish compaction with fixed set of files, and hope for the best. // Treat as L0-ish compaction with fixed set of files, and hope for the best.
if (si.getStripeCount() > 0) { if (si.getStripeCount() > 0) {
return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries()); return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
@ -376,7 +376,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
/** Stripe compaction request wrapper. */ /** Stripe compaction request wrapper. */
public abstract static class StripeCompactionRequest { public abstract static class StripeCompactionRequest {
protected CompactionRequest request; protected CompactionRequestImpl request;
protected byte[] majorRangeFromRow = null, majorRangeToRow = null; protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
public List<Path> execute(StripeCompactor compactor, public List<Path> execute(StripeCompactor compactor,
@ -392,7 +392,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
public abstract List<Path> execute(StripeCompactor compactor, public abstract List<Path> execute(StripeCompactor compactor,
ThroughputController throughputController, User user) throws IOException; ThroughputController throughputController, User user) throws IOException;
public StripeCompactionRequest(CompactionRequest request) { public StripeCompactionRequest(CompactionRequestImpl request) {
this.request = request; this.request = request;
} }
@ -407,11 +407,11 @@ public class StripeCompactionPolicy extends CompactionPolicy {
this.majorRangeToRow = endRow; this.majorRangeToRow = endRow;
} }
public CompactionRequest getRequest() { public CompactionRequestImpl getRequest() {
return this.request; return this.request;
} }
public void setRequest(CompactionRequest request) { public void setRequest(CompactionRequestImpl request) {
assert request != null; assert request != null;
this.request = request; this.request = request;
this.majorRangeFromRow = this.majorRangeToRow = null; this.majorRangeFromRow = this.majorRangeToRow = null;
@ -429,7 +429,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @param request Original request. * @param request Original request.
* @param targetBoundaries New files should be written with these boundaries. * @param targetBoundaries New files should be written with these boundaries.
*/ */
public BoundaryStripeCompactionRequest(CompactionRequest request, public BoundaryStripeCompactionRequest(CompactionRequestImpl request,
List<byte[]> targetBoundaries) { List<byte[]> targetBoundaries) {
super(request); super(request);
this.targetBoundaries = targetBoundaries; this.targetBoundaries = targetBoundaries;
@ -437,7 +437,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
public BoundaryStripeCompactionRequest(Collection<HStoreFile> files, public BoundaryStripeCompactionRequest(Collection<HStoreFile> files,
List<byte[]> targetBoundaries) { List<byte[]> targetBoundaries) {
this(new CompactionRequest(files), targetBoundaries); this(new CompactionRequestImpl(files), targetBoundaries);
} }
@Override @Override
@ -467,7 +467,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
* total number of kvs, all the overflow data goes into the last stripe. * total number of kvs, all the overflow data goes into the last stripe.
*/ */
public SplitStripeCompactionRequest(CompactionRequest request, public SplitStripeCompactionRequest(CompactionRequestImpl request,
byte[] startRow, byte[] endRow, int targetCount, long targetKvs) { byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
super(request); super(request);
this.startRow = startRow; this.startRow = startRow;
@ -483,7 +483,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
public SplitStripeCompactionRequest(Collection<HStoreFile> files, public SplitStripeCompactionRequest(Collection<HStoreFile> files,
byte[] startRow, byte[] endRow, int targetCount, long targetKvs) { byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs); this(new CompactionRequestImpl(files), startRow, endRow, targetCount, targetKvs);
} }
@Override @Override

View File

@ -58,7 +58,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
} }
@Override @Override
public ScanType getScanType(CompactionRequest request) { public ScanType getScanType(CompactionRequestImpl request) {
// If majorRangeFromRow and majorRangeToRow are not null, then we will not use the return // If majorRangeFromRow and majorRangeToRow are not null, then we will not use the return
// value to create InternalScanner. See the createScanner method below. The return value is // value to create InternalScanner. See the createScanner method below. The return value is
// also used when calling coprocessor hooks. // also used when calling coprocessor hooks.
@ -76,7 +76,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
} }
} }
public List<Path> compact(CompactionRequest request, final List<byte[]> targetBoundaries, public List<Path> compact(CompactionRequestImpl request, final List<byte[]> targetBoundaries,
final byte[] majorRangeFromRow, final byte[] majorRangeToRow, final byte[] majorRangeFromRow, final byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -101,7 +101,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}, throughputController, user); }, throughputController, user);
} }
public List<Path> compact(CompactionRequest request, final int targetCount, final long targetSize, public List<Path> compact(CompactionRequestImpl request, final int targetCount, final long targetSize,
final byte[] left, final byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, final byte[] left, final byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -125,7 +125,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override @Override
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd, protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor()); List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles; return newFiles;

View File

@ -113,6 +113,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AccessDeniedException;
@ -1572,8 +1573,8 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
throws IOException { CompactionRequest request) throws IOException {
requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null, requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null,
Action.ADMIN, Action.CREATE); Action.ADMIN, Action.CREATE);
return scanner; return scanner;

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -265,7 +266,8 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker request, long readPoint) long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker,
CompactionRequest request, long readPoint)
throws IOException { throws IOException {
return createCompactorScanner((HStore) store, scanners, scanType, earliestPutTs); return createCompactorScanner((HStore) store, scanners, scanType, earliestPutTs);
} }

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -202,20 +203,22 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
@Override @Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException { List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
ctPreCompactSelect.incrementAndGet(); ctPreCompactSelect.incrementAndGet();
} }
@Override @Override
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker) { ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
ctPostCompactSelect.incrementAndGet(); ctPostCompactSelect.incrementAndGet();
} }
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
throws IOException { CompactionRequest request) throws IOException {
ctPreCompact.incrementAndGet(); ctPreCompact.incrementAndGet();
return scanner; return scanner;
} }
@ -223,14 +226,16 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
ctPreCompactScanner.incrementAndGet(); ctPreCompactScanner.incrementAndGet();
return s; return s;
} }
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException { StoreFile resultFile, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
ctPostCompact.incrementAndGet(); ctPostCompact.incrementAndGet();
} }

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Rule; import org.junit.Rule;
@ -201,13 +202,15 @@ public class TestCoprocessorInterface {
} }
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) { Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
preCompactCalled = true; preCompactCalled = true;
return scanner; return scanner;
} }
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker) { Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
postCompactCalled = true; postCompactCalled = true;
} }
@Override @Override

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
@ -423,7 +424,8 @@ public class TestRegionObserverInterface {
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) { InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
return new InternalScanner() { return new InternalScanner() {
@Override @Override
public boolean next(List<Cell> results) throws IOException { public boolean next(List<Cell> results) throws IOException {
@ -462,7 +464,7 @@ public class TestRegionObserverInterface {
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) { StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
lastCompaction = EnvironmentEdgeManager.currentTime(); lastCompaction = EnvironmentEdgeManager.currentTime();
} }

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@ -177,7 +178,8 @@ public class TestRegionObserverScannerOpenHook {
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker,
CompactionRequest request, long readPoint)
throws IOException { throws IOException {
scanners.forEach(KeyValueScanner::close); scanners.forEach(KeyValueScanner::close);
return NO_DATA; return NO_DATA;

View File

@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -730,7 +731,8 @@ public class TestMobCompactor {
@Override @Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker,
CompactionRequest request)
throws IOException { throws IOException {
int count = candidates.size(); int count = candidates.size();
if (count >= 2) { if (count >= 2) {

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -465,7 +466,8 @@ public class TestNamespaceAuditor {
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException { StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
throws IOException {
postCompact.countDown(); postCompact.countDown();
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
/** /**
* RegionObserver that just reimplements the default behavior, * RegionObserver that just reimplements the default behavior,
@ -69,7 +70,8 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver
public InternalScanner preCompactScannerOpen( public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
HStore hs = (HStore) store; HStore hs = (HStore) store;
// this demonstrates how to override the scanners default behavior // this demonstrates how to override the scanners default behavior
ScanInfo oldSI = hs.getScanInfo(); ScanInfo oldSI = hs.getScanInfo();

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
@ -415,7 +415,7 @@ public class TestCompaction {
@Override @Override
public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException { boolean mayUseOffPeak, boolean forceMajor) throws IOException {
this.request = new CompactionRequest(selectedFiles); this.request = new CompactionRequestImpl(selectedFiles);
this.request.setPriority(getPriority()); this.request.setPriority(getPriority());
return true; return true;
} }
@ -496,7 +496,7 @@ public class TestCompaction {
@Override @Override
public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e) public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
throws IOException { throws IOException {
this.request = new CompactionRequest(new ArrayList<>()); this.request = new CompactionRequestImpl(new ArrayList<>());
return true; return true;
} }
} }

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -192,7 +192,7 @@ public class TestCompactionPolicy {
long... expected) throws IOException { long... expected) throws IOException {
store.forceMajor = forcemajor; store.forceMajor = forcemajor;
// Test Default compactions // Test Default compactions
CompactionRequest result = CompactionRequestImpl result =
((RatioBasedCompactionPolicy) store.storeEngine.getCompactionPolicy()).selectCompaction( ((RatioBasedCompactionPolicy) store.storeEngine.getCompactionPolicy()).selectCompaction(
candidates, new ArrayList<>(), false, isOffPeak, forcemajor); candidates, new ArrayList<>(), false, isOffPeak, forcemajor);
List<HStoreFile> actual = new ArrayList<>(result.getFiles()); List<HStoreFile> actual = new ArrayList<>(result.getFiles());

View File

@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -173,7 +173,7 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
} }
} }
// Test Default compactions // Test Default compactions
CompactionRequest result = ((RatioBasedCompactionPolicy) store.storeEngine CompactionRequestImpl result = ((RatioBasedCompactionPolicy) store.storeEngine
.getCompactionPolicy()).selectCompaction(candidates, .getCompactionPolicy()).selectCompaction(candidates,
new ArrayList<>(), false, false, false); new ArrayList<>(), false, false, false);
Assert.assertTrue(result.getFiles().isEmpty()); Assert.assertTrue(result.getFiles().isEmpty());

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -262,7 +263,8 @@ public class TestHRegionServerBulkLoad {
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request)
throws IOException { throws IOException {
try { try {
Thread.sleep(sleepDuration); Thread.sleep(sleepDuration);

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -417,7 +417,7 @@ public class TestMajorCompaction {
} }
store.triggerMajorCompaction(); store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction().get().getRequest(); CompactionRequestImpl request = store.requestCompaction().get().getRequest();
assertNotNull("Expected to receive a compaction request", request); assertNotNull("Expected to receive a compaction request", request);
assertEquals( assertEquals(
"System-requested major compaction should not occur if there are too many store files", "System-requested major compaction should not occur if there are too many store files",
@ -436,7 +436,7 @@ public class TestMajorCompaction {
createStoreFile(r); createStoreFile(r);
} }
store.triggerMajorCompaction(); store.triggerMajorCompaction();
CompactionRequest request = CompactionRequestImpl request =
store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null).get() store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null).get()
.getRequest(); .getRequest();
assertNotNull("Expected to receive a compaction request", request); assertNotNull("Expected to receive a compaction request", request);

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
@ -76,7 +76,7 @@ public class TestStripeStoreEngine {
StripeCompactor mockCompactor = mock(StripeCompactor.class); StripeCompactor mockCompactor = mock(StripeCompactor.class);
se.setCompactorOverride(mockCompactor); se.setCompactorOverride(mockCompactor);
when( when(
mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), mockCompactor.compact(any(CompactionRequestImpl.class), anyInt(), anyLong(), any(byte[].class),
any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class),
any(ThroughputController.class), any(User.class))) any(ThroughputController.class), any(User.class)))
.thenReturn(new ArrayList<>()); .thenReturn(new ArrayList<>());
@ -92,7 +92,7 @@ public class TestStripeStoreEngine {
// Override the file list. Granted, overriding this compaction in this manner will // Override the file list. Granted, overriding this compaction in this manner will
// break things in real world, but we only want to verify the override. // break things in real world, but we only want to verify the override.
compactUs.remove(sf); compactUs.remove(sf);
CompactionRequest req = new CompactionRequest(compactUs); CompactionRequestImpl req = new CompactionRequestImpl(compactUs);
compaction.forceSelect(req); compaction.forceSelect(req);
assertEquals(2, compaction.getRequest().getFiles().size()); assertEquals(2, compaction.getRequest().getFiles().size());
assertFalse(compaction.getRequest().getFiles().contains(sf)); assertFalse(compaction.getRequest().getFiles().contains(sf));

View File

@ -169,7 +169,7 @@ public class PerfTestCompactionPolicies extends MockStoreFileGenerator {
private List<HStoreFile> runIteration(List<HStoreFile> startingStoreFiles) throws IOException { private List<HStoreFile> runIteration(List<HStoreFile> startingStoreFiles) throws IOException {
List<HStoreFile> storeFiles = new ArrayList<>(startingStoreFiles); List<HStoreFile> storeFiles = new ArrayList<>(startingStoreFiles);
CompactionRequest req = cp.selectCompaction( CompactionRequestImpl req = cp.selectCompaction(
storeFiles, new ArrayList<>(), false, false, false); storeFiles, new ArrayList<>(), false, false, false);
long newFileSize = 0; long newFileSize = 0;

View File

@ -71,8 +71,8 @@ public class TestCompactor {
return sf; return sf;
} }
public static CompactionRequest createDummyRequest() throws Exception { public static CompactionRequestImpl createDummyRequest() throws Exception {
return new CompactionRequest(Arrays.asList(createDummyStoreFile(1L))); return new CompactionRequestImpl(Arrays.asList(createDummyStoreFile(1L)));
} }
// StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted. // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.

View File

@ -130,7 +130,7 @@ public class TestDateTieredCompactor {
HStoreFile sf1 = createDummyStoreFile(1L); HStoreFile sf1 = createDummyStoreFile(1L);
HStoreFile sf2 = createDummyStoreFile(2L); HStoreFile sf2 = createDummyStoreFile(2L);
DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2)); DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
List<Path> paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)), List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null); boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null);
writers.verifyKvs(output, allFiles, boundaries); writers.verifyKvs(output, allFiles, boundaries);
if (allFiles) { if (allFiles) {
@ -156,7 +156,7 @@ public class TestDateTieredCompactor {
@Test @Test
public void testEmptyOutputFile() throws Exception { public void testEmptyOutputFile() throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture(); StoreFileWritersCapture writers = new StoreFileWritersCapture();
CompactionRequest request = createDummyRequest(); CompactionRequestImpl request = createDummyRequest();
DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0], DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
new ArrayList<>(request.getFiles())); new ArrayList<>(request.getFiles()));
List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE), List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),

View File

@ -230,7 +230,9 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al())); assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); // UnmodifiableCollection does not implement equals so we need to change it here to a
// collection that implements it.
assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
scr.execute(sc, NoLimitThroughputController.INSTANCE, null); scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),

View File

@ -874,7 +874,7 @@ public class TestAccessController extends SecureTestUtil {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
ACCESS_CONTROLLER.preCompact(ObserverContext.createAndPrepare(RCP_ENV, null), null, null, ACCESS_CONTROLLER.preCompact(ObserverContext.createAndPrepare(RCP_ENV, null), null, null,
ScanType.COMPACT_RETAIN_DELETES, null); ScanType.COMPACT_RETAIN_DELETES, null, null);
return null; return null;
} }
}; };

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -272,7 +273,8 @@ public class TestCoprocessorScanPolicy {
public InternalScanner preCompactScannerOpen( public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s,CompactionLifeCycleTracker tracker, long readPoint) throws IOException { InternalScanner s,CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
HStore hs = (HStore) store; HStore hs = (HStore) store;
Long newTtl = ttls.get(store.getTableName()); Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName()); Integer newVersions = versions.get(store.getTableName());