HBASE-18815 We need to pass something like CompactionRequest in CP to give user some information about the compaction

CompactionRequest was removed from CP in HBASE-18453, this change reintroduces
CompatcionRequest to CP as a read-only interface called CompactionRequest.
The CompactionRequest class is renamed to CompactionRequestImpl.

Additionally, this change removes selectionTimeInNanos from CompactionRequest and
uses selectionTime as a replacement. This means that CompactionRequest:toString
is modified and compare as well.

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Peter Somogyi 2017-09-29 15:25:25 -07:00 committed by Michael Stack
parent 3318e8724e
commit d3a817f212
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
43 changed files with 382 additions and 262 deletions

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.KeeperException;
@ -228,7 +229,8 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) {
// take default action

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.util.Pair;
@ -187,9 +188,11 @@ public interface RegionObserver {
* @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction
* @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/
default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {}
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {}
/**
* Called after the {@link StoreFile}s to compact have been selected from the available
@ -198,9 +201,11 @@ public interface RegionObserver {
* @param store the store being compacted
* @param selected the store files selected to compact
* @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/
default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker) {}
ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
CompactionRequest request) {}
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
@ -221,11 +226,13 @@ public interface RegionObserver {
* @param scanner the scanner over existing data used in the store file rewriting
* @param scanType type of Scan
* @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
* @return the scanner to use during compaction. Should not be {@code null} unless the
* implementation is writing new store files on its own.
*/
default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request)
throws IOException {
return scanner;
}
@ -245,13 +252,15 @@ public interface RegionObserver {
* files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param tracker used to track the life cycle of a compaction
* @param request the requested compaction
* @param readPoint the readpoint to create scanner
* @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used.
*/
default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
return s;
}
@ -261,9 +270,11 @@ public interface RegionObserver {
* @param store the store being compacted
* @param resultFile the new store file written out during compaction
* @param tracker used to track the life cycle of a compaction
* @param request the requested compaction
*/
default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {}
StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
throws IOException {}
/**
* Called before the region is reported as closed to the master.
@ -320,7 +331,7 @@ public interface RegionObserver {
* coprocessors
* @param c the environment provided by the region server
* @param get the Get request
* @param exists
* @param exists the result returned by the region server
* @return the value to return to the client if bypassing default processing
*/
default boolean preExists(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
@ -799,8 +810,8 @@ public interface RegionObserver {
* <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
* and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, long)} to override scanners created for flushes
* or compactions, resp.
* InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)} to override scanners
* created for flushes or compactions, resp.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
@ -41,17 +40,17 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compact passed set of files in the mob-enabled column family.
@ -66,7 +65,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
@Override
public ScanType getScanType(CompactionRequest request) {
public ScanType getScanType(CompactionRequestImpl request) {
// retain the delete markers until they are expired.
return ScanType.COMPACT_RETAIN_DELETES;
}
@ -105,7 +104,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
}
@Override
public List<Path> compact(CompactionRequest request, ThroughputController throughputController,
public List<Path> compact(CompactionRequestImpl request, ThroughputController throughputController,
User user) throws IOException {
return compact(request, scannerFactory, writerFactory, throughputController, user);
}

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
@ -393,7 +393,7 @@ public class CompactSplit implements PropagatingConfigurationObserver {
private static final Comparator<Runnable> COMPARATOR =
new Comparator<Runnable>() {
private int compare(CompactionRequest r1, CompactionRequest r2) {
private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
if (r1 == r2) {
return 0; //they are the same request
}
@ -402,7 +402,7 @@ public class CompactSplit implements PropagatingConfigurationObserver {
if (cmp != 0) {
return cmp;
}
cmp = Long.compare(r1.getSelectionNanoTime(), r2.getSelectionNanoTime());
cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
if (cmp != 0) {
return cmp;
}

View File

@ -23,9 +23,9 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
@ -81,7 +81,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
}
@Override
public void forceSelect(CompactionRequest request) {
public void forceSelect(CompactionRequestImpl request) {
if (!(request instanceof DateTieredCompactionRequest)) {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
+ request.getClass().getCanonicalName());

View File

@ -82,7 +82,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
@ -1349,9 +1349,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
ThroughputController throughputController, User user) throws IOException {
assert compaction != null;
List<HStoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest();
CompactionRequestImpl cr = compaction.getRequest();
try {
// Do all sanity checking in here if we have a valid CompactionRequest
// Do all sanity checking in here if we have a valid CompactionRequestImpl
// because we need to clean up after it on the way out in a finally
// block below
long compactionStartTime = EnvironmentEdgeManager.currentTime();
@ -1387,7 +1387,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
return sfs;
}
// Do the steps necessary to complete the compaction.
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
if (cr.isMajor()) {
@ -1417,14 +1417,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
}
private List<HStoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles,
private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr, List<Path> newFiles,
User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) {
assert newFile != null;
HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user);
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
}
assert sf != null;
sfs.add(sf);
@ -1483,7 +1483,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* @param compactionStartTime Start time.
*/
private void logCompactionEndMessage(
CompactionRequest cr, List<HStoreFile> sfs, long now, long compactionStartTime) {
CompactionRequestImpl cr, List<HStoreFile> sfs, long now, long compactionStartTime) {
StringBuilder message = new StringBuilder(
"Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
+ cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
@ -1625,7 +1625,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
// Move the compaction into place.
HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompact(this, sf, null, null);
this.getCoprocessorHost().postCompact(this, sf, null, null, null);
}
replaceStoreFiles(filesToCompact, Collections.singletonList(sf));
completeCompaction(filesToCompact);
@ -1674,7 +1674,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
removeUnneededFiles();
final CompactionContext compaction = storeEngine.createCompaction();
CompactionRequest request = null;
CompactionRequestImpl request = null;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
@ -1682,11 +1682,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
if (this.getCoprocessorHost() != null) {
final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = false;
//TODO: is it correct way to get CompactionRequest?
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
tracker, user);
tracker, null, user);
if (override) {
// Coprocessor is overriding normal file selection.
compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));
}
}
@ -1712,7 +1713,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompactSelection(
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user);
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,
compaction.getRequest(), user);
}
// Finally, we have the resulting files list. Check if we have any files at all.
request = compaction.getRequest();
@ -1790,7 +1792,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
finishCompactionRequest(compaction.getRequest());
}
private void finishCompactionRequest(CompactionRequest cr) {
private void finishCompactionRequest(CompactionRequestImpl cr) {
this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
if (cr.isOffPeak()) {
offPeakCompactionTracker.set(false);

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
@ -544,11 +545,12 @@ public class RegionCoprocessorHost
/**
* See
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, long)}
* InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)}
*/
public InternalScanner preCompactScannerOpen(final HStore store,
final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
final CompactionLifeCycleTracker tracker, final User user, final long readPoint)
final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user,
final long readPoint)
throws IOException {
return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
new ObserverOperationWithResult<RegionObserver, InternalScanner>(
@ -556,7 +558,7 @@ public class RegionCoprocessorHost
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preCompactScannerOpen(this, store, scanners, scanType,
earliestPutTs, getResult(), tracker, readPoint);
earliestPutTs, getResult(), tracker, request, readPoint);
}
});
}
@ -567,15 +569,18 @@ public class RegionCoprocessorHost
* @param store The store where compaction is being requested
* @param candidates The currently available store files
* @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
* @return If {@code true}, skip the normal selection process and use the current list
* @throws IOException
*/
public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
final CompactionLifeCycleTracker tracker, final User user) throws IOException {
final CompactionLifeCycleTracker tracker, final CompactionRequest request,
final User user) throws IOException {
return execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preCompactSelection(this, store, candidates, tracker);
observer.preCompactSelection(this, store, candidates, tracker, request);
}
});
}
@ -586,13 +591,16 @@ public class RegionCoprocessorHost
* @param store The store where compaction is being requested
* @param selected The store files selected to compact
* @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
*/
public void postCompactSelection(final HStore store, final ImmutableList<HStoreFile> selected,
final CompactionLifeCycleTracker tracker, final User user) throws IOException {
final CompactionLifeCycleTracker tracker, final CompactionRequest request,
final User user) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postCompactSelection(this, store, selected, tracker);
observer.postCompactSelection(this, store, selected, tracker, request);
}
});
}
@ -603,17 +611,19 @@ public class RegionCoprocessorHost
* @param scanner the scanner used to read store data during compaction
* @param scanType type of Scan
* @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
* @throws IOException
*/
public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
final ScanType scanType, final CompactionLifeCycleTracker tracker, final User user)
throws IOException {
final ScanType scanType, final CompactionLifeCycleTracker tracker,
final CompactionRequest request, final User user) throws IOException {
return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null :
new ObserverOperationWithResult<RegionObserver, InternalScanner>(
regionObserverGetter, user) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preCompact(this, store, getResult(), scanType, tracker);
return observer.preCompact(this, store, getResult(), scanType, tracker, request);
}
});
}
@ -623,14 +633,17 @@ public class RegionCoprocessorHost
* @param store the store being compacted
* @param resultFile the new store file written during compaction
* @param tracker used to track the life cycle of a compaction
* @param request the compaction request
* @param user the user
* @throws IOException
*/
public void postCompact(final HStore store, final HStoreFile resultFile,
final CompactionLifeCycleTracker tracker, final User user) throws IOException {
final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postCompact(this, store, resultFile, tracker);
observer.postCompact(this, store, resultFile, tracker, request);
}
});
}

View File

@ -27,9 +27,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@ -84,12 +84,12 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
this.stripeRequest = compactionPolicy.selectCompaction(
storeFileManager, filesCompacting, mayUseOffPeak);
this.request = (this.stripeRequest == null)
? new CompactionRequest(new ArrayList<>()) : this.stripeRequest.getRequest();
? new CompactionRequestImpl(new ArrayList<>()) : this.stripeRequest.getRequest();
return this.stripeRequest != null;
}
@Override
public void forceSelect(CompactionRequest request) {
public void forceSelect(CompactionRequestImpl request) {
super.forceSelect(request);
if (this.stripeRequest != null) {
this.stripeRequest.setRequest(this.request);

View File

@ -36,7 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public abstract class CompactionContext {
protected CompactionRequest request = null;
protected CompactionRequestImpl request = null;
/**
* Called before coprocessor preCompactSelection and should filter the candidates
@ -61,14 +61,14 @@ public abstract class CompactionContext {
* Forces external selection to be applied for this compaction.
* @param request The pre-cooked request with selection and other settings.
*/
public void forceSelect(CompactionRequest request) {
public void forceSelect(CompactionRequestImpl request) {
this.request = request;
}
public abstract List<Path> compact(ThroughputController throughputController, User user)
throws IOException;
public CompactionRequest getRequest() {
public CompactionRequestImpl getRequest() {
assert hasSelection();
return this.request;
}

View File

@ -62,8 +62,6 @@ public abstract class CompactionPolicy {
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
}
/**
* @return The current compaction configuration settings.
*/

View File

@ -18,142 +18,51 @@
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import java.util.Collection;
/**
* This class holds all logical details necessary to run a compaction.
* Coprocessors use this interface to get details about compaction.
*/
@InterfaceAudience.Private
public class CompactionRequest {
// was this compaction promoted to an off-peak
private boolean isOffPeak = false;
private enum DisplayCompactionType { MINOR, ALL_FILES, MAJOR }
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
private int priority = NO_PRIORITY;
private Collection<HStoreFile> filesToCompact;
// CompactRequest object creation time.
private long selectionTime;
// System time used to compare objects in FIFO order. TODO: maybe use selectionTime?
private long timeInNanos;
private String regionName = "";
private String storeName = "";
private long totalSize = -1L;
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
public CompactionRequest(Collection<HStoreFile> files) {
this.selectionTime = EnvironmentEdgeManager.currentTime();
this.timeInNanos = System.nanoTime();
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize();
}
public void updateFiles(Collection<HStoreFile> files) {
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize();
}
public Collection<HStoreFile> getFiles() {
return this.filesToCompact;
}
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public interface CompactionRequest {
/**
* Sets the region/store name, for logging.
* @return unmodifiable collection of StoreFiles in compaction
*/
public void setDescription(String regionName, String storeName) {
this.regionName = regionName;
this.storeName = storeName;
}
/** Gets the total size of all StoreFiles in compaction */
public long getSize() {
return totalSize;
}
public boolean isAllFiles() {
return this.isMajor == DisplayCompactionType.MAJOR
|| this.isMajor == DisplayCompactionType.ALL_FILES;
}
public boolean isMajor() {
return this.isMajor == DisplayCompactionType.MAJOR;
}
/** Gets the priority for the request */
public int getPriority() {
return priority;
}
/** Sets the priority for the request */
public void setPriority(int p) {
this.priority = p;
}
public boolean isOffPeak() {
return this.isOffPeak;
}
public void setOffPeak(boolean value) {
this.isOffPeak = value;
}
public long getSelectionTime() {
return this.selectionTime;
}
public long getSelectionNanoTime() {
return this.timeInNanos;
}
Collection<? extends StoreFile> getFiles();
/**
* Specify if this compaction should be a major compaction based on the state of the store
* @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
* compaction
* @return total size of all StoreFiles in compaction
*/
public void setIsMajor(boolean isMajor, boolean isAllFiles) {
assert isAllFiles || !isMajor;
this.isMajor = !isAllFiles ? DisplayCompactionType.MINOR
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
}
public void setTracker(CompactionLifeCycleTracker tracker) {
this.tracker = tracker;
}
public CompactionLifeCycleTracker getTracker() {
return tracker;
}
@Override
public String toString() {
String fsList = filesToCompact.stream().filter(f -> f.getReader() != null)
.map(f -> TraditionalBinaryPrefix.long2String(f.getReader().length(), "", 1))
.collect(Collectors.joining(", "));
return "regionName=" + regionName + ", storeName=" + storeName + ", fileCount=" +
this.getFiles().size() + ", fileSize=" +
TraditionalBinaryPrefix.long2String(totalSize, "", 1) +
((fsList.isEmpty()) ? "" : " (" + fsList + ")") + ", priority=" + priority + ", time=" +
timeInNanos;
}
long getSize();
/**
* Recalculate the size of the compaction based on current files.
* @param files files that should be included in the compaction
* @return <code>true</code> if major compaction or all files are compacted
*/
private void recalculateSize() {
this.totalSize = filesToCompact.stream().map(HStoreFile::getReader)
.mapToLong(r -> r != null ? r.length() : 0L).sum();
}
boolean isAllFiles();
/**
* @return <code>true</code> if major compaction
*/
boolean isMajor();
/**
* @return priority of compaction request
*/
int getPriority();
/**
* @return <code>true</code> if compaction is Off-peak
*/
boolean isOffPeak();
/**
* @return compaction request creation time in milliseconds
*/
long getSelectionTime();
}

View File

@ -0,0 +1,159 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
/**
* This class holds all logical details necessary to run a compaction.
*/
@InterfaceAudience.Private
public class CompactionRequestImpl implements CompactionRequest {
// was this compaction promoted to an off-peak
private boolean isOffPeak = false;
private enum DisplayCompactionType { MINOR, ALL_FILES, MAJOR }
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
private int priority = NO_PRIORITY;
private Collection<HStoreFile> filesToCompact;
// CompactRequest object creation time.
private long selectionTime;
private String regionName = "";
private String storeName = "";
private long totalSize = -1L;
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
public CompactionRequestImpl(Collection<HStoreFile> files) {
this.selectionTime = EnvironmentEdgeManager.currentTime();
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize();
}
public void updateFiles(Collection<HStoreFile> files) {
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize();
}
@Override
public Collection<HStoreFile> getFiles() {
return Collections.unmodifiableCollection(this.filesToCompact);
}
/**
* Sets the region/store name, for logging.
*/
public void setDescription(String regionName, String storeName) {
this.regionName = regionName;
this.storeName = storeName;
}
/** Gets the total size of all StoreFiles in compaction */
@Override
public long getSize() {
return totalSize;
}
@Override
public boolean isAllFiles() {
return this.isMajor == DisplayCompactionType.MAJOR
|| this.isMajor == DisplayCompactionType.ALL_FILES;
}
@Override
public boolean isMajor() {
return this.isMajor == DisplayCompactionType.MAJOR;
}
/** Gets the priority for the request */
@Override
public int getPriority() {
return priority;
}
/** Sets the priority for the request */
public void setPriority(int p) {
this.priority = p;
}
@Override
public boolean isOffPeak() {
return this.isOffPeak;
}
public void setOffPeak(boolean value) {
this.isOffPeak = value;
}
@Override
public long getSelectionTime() {
return this.selectionTime;
}
/**
* Specify if this compaction should be a major compaction based on the state of the store
* @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
* compaction
*/
public void setIsMajor(boolean isMajor, boolean isAllFiles) {
assert isAllFiles || !isMajor;
this.isMajor = !isAllFiles ? DisplayCompactionType.MINOR
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
}
public void setTracker(CompactionLifeCycleTracker tracker) {
this.tracker = tracker;
}
public CompactionLifeCycleTracker getTracker() {
return tracker;
}
@Override
public String toString() {
String fsList = filesToCompact.stream().filter(f -> f.getReader() != null)
.map(f -> TraditionalBinaryPrefix.long2String(f.getReader().length(), "", 1))
.collect(Collectors.joining(", "));
return "regionName=" + regionName + ", storeName=" + storeName + ", fileCount=" +
this.getFiles().size() + ", fileSize=" +
TraditionalBinaryPrefix.long2String(totalSize, "", 1) +
((fsList.isEmpty()) ? "" : " (" + fsList + ")") + ", priority=" + priority + ", time=" +
selectionTime;
}
/**
* Recalculate the size of the compaction based on current files.
*/
private void recalculateSize() {
this.totalSize = filesToCompact.stream().map(HStoreFile::getReader)
.mapToLong(r -> r != null ? r.length() : 0L).sum();
}
}

View File

@ -229,7 +229,7 @@ public abstract class Compactor<T extends CellSink> {
protected interface InternalScannerFactory {
ScanType getScanType(CompactionRequest request);
ScanType getScanType(CompactionRequestImpl request);
InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException;
@ -238,7 +238,7 @@ public abstract class Compactor<T extends CellSink> {
protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
@Override
public ScanType getScanType(CompactionRequest request) {
public ScanType getScanType(CompactionRequestImpl request) {
return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
: ScanType.COMPACT_RETAIN_DELETES;
}
@ -267,7 +267,7 @@ public abstract class Compactor<T extends CellSink> {
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
}
protected List<Path> compact(final CompactionRequest request,
protected List<Path> compact(final CompactionRequestImpl request,
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
@ -325,8 +325,8 @@ public abstract class Compactor<T extends CellSink> {
return commitWriter(writer, fd, request);
}
protected abstract List<Path> commitWriter(T writer, FileDetails fd, CompactionRequest request)
throws IOException;
protected abstract List<Path> commitWriter(T writer, FileDetails fd,
CompactionRequestImpl request) throws IOException;
protected abstract void abortWriter(T writer) throws IOException;
@ -340,14 +340,14 @@ public abstract class Compactor<T extends CellSink> {
* @param readPoint the read point to help create scanner by Coprocessor if required.
* @return Scanner override by coprocessor; null if not overriding.
*/
protected InternalScanner preCreateCoprocScanner(CompactionRequest request, ScanType scanType,
protected InternalScanner preCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
long earliestPutTs, List<StoreFileScanner> scanners, User user, long readPoint)
throws IOException {
if (store.getCoprocessorHost() == null) {
return null;
}
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
earliestPutTs, request.getTracker(), user, readPoint);
earliestPutTs, request.getTracker(), request, user, readPoint);
}
/**
@ -357,13 +357,13 @@ public abstract class Compactor<T extends CellSink> {
* @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/
protected InternalScanner postCreateCoprocScanner(CompactionRequest request, ScanType scanType,
protected InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) {
return scanner;
}
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
user);
request, user);
}
/**

View File

@ -186,9 +186,9 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
}
@Override
protected CompactionRequest createCompactionRequest(ArrayList<HStoreFile> candidateSelection,
protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection,
boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
CompactionRequest result = tryingMajor ? selectMajorCompaction(candidateSelection)
CompactionRequestImpl result = tryingMajor ? selectMajorCompaction(candidateSelection)
: selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
if (LOG.isDebugEnabled()) {
LOG.debug("Generated compaction request: " + result);
@ -196,7 +196,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
return result;
}
public CompactionRequest selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
long now = EnvironmentEdgeManager.currentTime();
return new DateTieredCompactionRequest(candidateSelection,
this.getCompactBoundariesForMajor(candidateSelection, now));
@ -210,7 +210,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
* by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
* data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
*/
public CompactionRequest selectMinorCompaction(ArrayList<HStoreFile> candidateSelection,
public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection,
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now);
@ -261,7 +261,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
}
}
// A non-null file list is expected by HStore
return new CompactionRequest(Collections.emptyList());
return new CompactionRequestImpl(Collections.emptyList());
}
private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS",
justification="It is intended to use the same equal method as superclass")
public class DateTieredCompactionRequest extends CompactionRequest {
public class DateTieredCompactionRequest extends CompactionRequestImpl {
private List<Long> boundaries;
public DateTieredCompactionRequest(Collection<HStoreFile> files, List<Long> boundaryList) {

View File

@ -45,7 +45,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
super(conf, store);
}
private boolean needEmptyFile(CompactionRequest request) {
private boolean needEmptyFile(CompactionRequestImpl request) {
// if we are going to compact the last N files, then we need to emit an empty file to retain the
// maxSeqId if we haven't written out anything.
OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles());
@ -54,7 +54,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
maxSeqId.getAsLong() == storeMaxSeqId.getAsLong();
}
public List<Path> compact(final CompactionRequest request, final List<Long> lowerBoundaries,
public List<Path> compact(final CompactionRequestImpl request, final List<Long> lowerBoundaries,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + lowerBoundaries.size()
@ -77,7 +77,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
@Override
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
CompactionRequestImpl request) throws IOException {
return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
}
}

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/**
* Compact passed set of files. Create an instance and then call
* {@link #compact(CompactionRequest, ThroughputController, User)}
* {@link #compact(CompactionRequestImpl, ThroughputController, User)}
*/
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor<StoreFileWriter> {
@ -61,16 +61,16 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*/
public List<Path> compact(final CompactionRequest request,
public List<Path> compact(final CompactionRequestImpl request,
ThroughputController throughputController, User user) throws IOException {
return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
}
/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest, ThroughputController, User)};
* Compact a list of files for testing. Creates a fake {@link CompactionRequestImpl} to pass to
* {@link #compact(CompactionRequestImpl, ThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for the
* generated {@link CompactionRequest}.
* generated {@link CompactionRequestImpl}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and nothing \
* made it through the compaction.
@ -78,14 +78,14 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
*/
public List<Path> compactForTesting(Collection<HStoreFile> filesToCompact, boolean isMajor)
throws IOException {
CompactionRequest cr = new CompactionRequest(filesToCompact);
CompactionRequestImpl cr = new CompactionRequestImpl(filesToCompact);
cr.setIsMajor(isMajor, isMajor);
return compact(cr, NoLimitThroughputController.INSTANCE, null);
}
@Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.close();

View File

@ -55,7 +55,7 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
}
@Override
public CompactionRequest selectCompaction(Collection<HStoreFile> candidateFiles,
public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles,
List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
boolean forceMajor) throws IOException {
if(forceMajor){
@ -67,10 +67,10 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor);
}
// Nothing to compact
Collection<HStoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
CompactionRequest result = new CompactionRequest(toCompact);
CompactionRequestImpl result = new CompactionRequestImpl(toCompact);
return result;
}
@ -123,7 +123,7 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
if (maxTtl == Long.MAX_VALUE
|| (currentTime - maxTtl < maxTs)){
continue;
} else if(filesCompacting == null || filesCompacting.contains(sf) == false){
} else if(filesCompacting == null || !filesCompacting.contains(sf)){
expiredStores.add(sf);
}
}

View File

@ -113,7 +113,7 @@ public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
}
@Override
protected CompactionRequest createCompactionRequest(ArrayList<HStoreFile>
protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile>
candidateSelection, boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck)
throws IOException {
if (!tryingMajor) {
@ -122,7 +122,7 @@ public class RatioBasedCompactionPolicy extends SortedCompactionPolicy {
candidateSelection = checkMinFilesCriteria(candidateSelection,
comConf.getMinFilesToCompact());
}
return new CompactionRequest(candidateSelection);
return new CompactionRequestImpl(candidateSelection);
}
/**

View File

@ -53,7 +53,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
* on seqId for data consistency.
* @return subset copy of candidate list that meets compaction criteria
*/
public CompactionRequest selectCompaction(Collection<HStoreFile> candidateFiles,
public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFiles,
List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters
@ -85,7 +85,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
// Or, if there are any references among the candidates.
boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
CompactionRequest result = createCompactionRequest(candidateSelection,
CompactionRequestImpl result = createCompactionRequest(candidateSelection,
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
@ -99,7 +99,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
return result;
}
protected abstract CompactionRequest createCompactionRequest(
protected abstract CompactionRequestImpl createCompactionRequest(
ArrayList<HStoreFile> candidateSelection, boolean tryingMajor, boolean mayUseOffPeak,
boolean mayBeStuck) throws IOException;

View File

@ -74,7 +74,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
}
public StripeCompactionRequest createEmptyRequest(
StripeInformationProvider si, CompactionRequest request) {
StripeInformationProvider si, CompactionRequestImpl request) {
// Treat as L0-ish compaction with fixed set of files, and hope for the best.
if (si.getStripeCount() > 0) {
return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
@ -376,7 +376,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
/** Stripe compaction request wrapper. */
public abstract static class StripeCompactionRequest {
protected CompactionRequest request;
protected CompactionRequestImpl request;
protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
public List<Path> execute(StripeCompactor compactor,
@ -392,7 +392,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
public abstract List<Path> execute(StripeCompactor compactor,
ThroughputController throughputController, User user) throws IOException;
public StripeCompactionRequest(CompactionRequest request) {
public StripeCompactionRequest(CompactionRequestImpl request) {
this.request = request;
}
@ -407,11 +407,11 @@ public class StripeCompactionPolicy extends CompactionPolicy {
this.majorRangeToRow = endRow;
}
public CompactionRequest getRequest() {
public CompactionRequestImpl getRequest() {
return this.request;
}
public void setRequest(CompactionRequest request) {
public void setRequest(CompactionRequestImpl request) {
assert request != null;
this.request = request;
this.majorRangeFromRow = this.majorRangeToRow = null;
@ -429,7 +429,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @param request Original request.
* @param targetBoundaries New files should be written with these boundaries.
*/
public BoundaryStripeCompactionRequest(CompactionRequest request,
public BoundaryStripeCompactionRequest(CompactionRequestImpl request,
List<byte[]> targetBoundaries) {
super(request);
this.targetBoundaries = targetBoundaries;
@ -437,7 +437,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
public BoundaryStripeCompactionRequest(Collection<HStoreFile> files,
List<byte[]> targetBoundaries) {
this(new CompactionRequest(files), targetBoundaries);
this(new CompactionRequestImpl(files), targetBoundaries);
}
@Override
@ -467,7 +467,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
* total number of kvs, all the overflow data goes into the last stripe.
*/
public SplitStripeCompactionRequest(CompactionRequest request,
public SplitStripeCompactionRequest(CompactionRequestImpl request,
byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
super(request);
this.startRow = startRow;
@ -483,7 +483,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
public SplitStripeCompactionRequest(Collection<HStoreFile> files,
byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
this(new CompactionRequestImpl(files), startRow, endRow, targetCount, targetKvs);
}
@Override

View File

@ -58,7 +58,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}
@Override
public ScanType getScanType(CompactionRequest request) {
public ScanType getScanType(CompactionRequestImpl request) {
// If majorRangeFromRow and majorRangeToRow are not null, then we will not use the return
// value to create InternalScanner. See the createScanner method below. The return value is
// also used when calling coprocessor hooks.
@ -76,7 +76,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}
}
public List<Path> compact(CompactionRequest request, final List<byte[]> targetBoundaries,
public List<Path> compact(CompactionRequestImpl request, final List<byte[]> targetBoundaries,
final byte[] majorRangeFromRow, final byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
@ -101,7 +101,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}, throughputController, user);
}
public List<Path> compact(CompactionRequest request, final int targetCount, final long targetSize,
public List<Path> compact(CompactionRequestImpl request, final int targetCount, final long targetSize,
final byte[] left, final byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
@ -125,7 +125,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles;

View File

@ -113,6 +113,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.security.AccessDeniedException;
@ -1572,8 +1573,8 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
throws IOException {
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null,
Action.ADMIN, Action.CREATE);
return scanner;

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -265,7 +266,8 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker request, long readPoint)
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker,
CompactionRequest request, long readPoint)
throws IOException {
return createCompactorScanner((HStore) store, scanners, scanType, earliestPutTs);
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -202,20 +203,22 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
@Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
ctPreCompactSelect.incrementAndGet();
}
@Override
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker) {
ImmutableList<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
ctPostCompactSelect.incrementAndGet();
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
throws IOException {
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
ctPreCompact.incrementAndGet();
return scanner;
}
@ -223,14 +226,16 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
ctPreCompactScanner.incrementAndGet();
return s;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {
StoreFile resultFile, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
ctPostCompact.incrementAndGet();
}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Rule;
@ -201,13 +202,15 @@ public class TestCoprocessorInterface {
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) {
Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
preCompactCalled = true;
return scanner;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker) {
Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
postCompactCalled = true;
}
@Override

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
@ -423,7 +424,8 @@ public class TestRegionObserverInterface {
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) {
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) {
return new InternalScanner() {
@Override
public boolean next(List<Cell> results) throws IOException {
@ -462,7 +464,7 @@ public class TestRegionObserverInterface {
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) {
StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) {
lastCompaction = EnvironmentEdgeManager.currentTime();
}

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@ -177,7 +178,8 @@ public class TestRegionObserverScannerOpenHook {
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint)
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker,
CompactionRequest request, long readPoint)
throws IOException {
scanners.forEach(KeyValueScanner::close);
return NO_DATA;

View File

@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -730,7 +731,8 @@ public class TestMobCompactor {
@Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker)
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker,
CompactionRequest request)
throws IOException {
int count = candidates.size();
if (count >= 2) {

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -465,7 +466,8 @@ public class TestNamespaceAuditor {
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {
StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
throws IOException {
postCompact.countDown();
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
/**
* RegionObserver that just reimplements the default behavior,
@ -69,7 +70,8 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
HStore hs = (HStore) store;
// this demonstrates how to override the scanners default behavior
ScanInfo oldSI = hs.getScanInfo();

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
@ -415,7 +415,7 @@ public class TestCompaction {
@Override
public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
this.request = new CompactionRequest(selectedFiles);
this.request = new CompactionRequestImpl(selectedFiles);
this.request.setPriority(getPriority());
return true;
}
@ -496,7 +496,7 @@ public class TestCompaction {
@Override
public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
throws IOException {
this.request = new CompactionRequest(new ArrayList<>());
this.request = new CompactionRequestImpl(new ArrayList<>());
return true;
}
}

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.util.Bytes;
@ -192,7 +192,7 @@ public class TestCompactionPolicy {
long... expected) throws IOException {
store.forceMajor = forcemajor;
// Test Default compactions
CompactionRequest result =
CompactionRequestImpl result =
((RatioBasedCompactionPolicy) store.storeEngine.getCompactionPolicy()).selectCompaction(
candidates, new ArrayList<>(), false, isOffPeak, forcemajor);
List<HStoreFile> actual = new ArrayList<>(result.getFiles());

View File

@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -173,7 +173,7 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
}
}
// Test Default compactions
CompactionRequest result = ((RatioBasedCompactionPolicy) store.storeEngine
CompactionRequestImpl result = ((RatioBasedCompactionPolicy) store.storeEngine
.getCompactionPolicy()).selectCompaction(candidates,
new ArrayList<>(), false, false, false);
Assert.assertTrue(result.getFiles().isEmpty());

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -262,7 +263,8 @@ public class TestHRegionServerBulkLoad {
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request)
throws IOException {
try {
Thread.sleep(sleepDuration);

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -417,7 +417,7 @@ public class TestMajorCompaction {
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction().get().getRequest();
CompactionRequestImpl request = store.requestCompaction().get().getRequest();
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"System-requested major compaction should not occur if there are too many store files",
@ -436,7 +436,7 @@ public class TestMajorCompaction {
createStoreFile(r);
}
store.triggerMajorCompaction();
CompactionRequest request =
CompactionRequestImpl request =
store.requestCompaction(PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null).get()
.getRequest();
assertNotNull("Expected to receive a compaction request", request);

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
@ -76,7 +76,7 @@ public class TestStripeStoreEngine {
StripeCompactor mockCompactor = mock(StripeCompactor.class);
se.setCompactorOverride(mockCompactor);
when(
mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
mockCompactor.compact(any(CompactionRequestImpl.class), anyInt(), anyLong(), any(byte[].class),
any(byte[].class), any(byte[].class), any(byte[].class),
any(ThroughputController.class), any(User.class)))
.thenReturn(new ArrayList<>());
@ -92,7 +92,7 @@ public class TestStripeStoreEngine {
// Override the file list. Granted, overriding this compaction in this manner will
// break things in real world, but we only want to verify the override.
compactUs.remove(sf);
CompactionRequest req = new CompactionRequest(compactUs);
CompactionRequestImpl req = new CompactionRequestImpl(compactUs);
compaction.forceSelect(req);
assertEquals(2, compaction.getRequest().getFiles().size());
assertFalse(compaction.getRequest().getFiles().contains(sf));

View File

@ -169,7 +169,7 @@ public class PerfTestCompactionPolicies extends MockStoreFileGenerator {
private List<HStoreFile> runIteration(List<HStoreFile> startingStoreFiles) throws IOException {
List<HStoreFile> storeFiles = new ArrayList<>(startingStoreFiles);
CompactionRequest req = cp.selectCompaction(
CompactionRequestImpl req = cp.selectCompaction(
storeFiles, new ArrayList<>(), false, false, false);
long newFileSize = 0;

View File

@ -71,8 +71,8 @@ public class TestCompactor {
return sf;
}
public static CompactionRequest createDummyRequest() throws Exception {
return new CompactionRequest(Arrays.asList(createDummyStoreFile(1L)));
public static CompactionRequestImpl createDummyRequest() throws Exception {
return new CompactionRequestImpl(Arrays.asList(createDummyStoreFile(1L)));
}
// StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.

View File

@ -130,7 +130,7 @@ public class TestDateTieredCompactor {
HStoreFile sf1 = createDummyStoreFile(1L);
HStoreFile sf2 = createDummyStoreFile(2L);
DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
List<Path> paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)),
List<Path> paths = dtc.compact(new CompactionRequestImpl(Arrays.asList(sf1)),
boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null);
writers.verifyKvs(output, allFiles, boundaries);
if (allFiles) {
@ -156,7 +156,7 @@ public class TestDateTieredCompactor {
@Test
public void testEmptyOutputFile() throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
CompactionRequest request = createDummyRequest();
CompactionRequestImpl request = createDummyRequest();
DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
new ArrayList<>(request.getFiles()));
List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),

View File

@ -230,7 +230,9 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
// UnmodifiableCollection does not implement equals so we need to change it here to a
// collection that implements it.
assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles()));
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),

View File

@ -874,7 +874,7 @@ public class TestAccessController extends SecureTestUtil {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preCompact(ObserverContext.createAndPrepare(RCP_ENV, null), null, null,
ScanType.COMPACT_RETAIN_DELETES, null);
ScanType.COMPACT_RETAIN_DELETES, null, null);
return null;
}
};

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -272,7 +273,8 @@ public class TestCoprocessorScanPolicy {
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s,CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
InternalScanner s,CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
HStore hs = (HStore) store;
Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName());