HBASE-18453 CompactionRequest should not be exposed to user directly
This commit is contained in:
parent
38e983ed44
commit
61d10feffa
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
|||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -203,7 +204,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
|
|||
@Override
|
||||
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
|
||||
InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
|
||||
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
|
||||
ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
|
||||
if (scanInfo == null) {
|
||||
// take default action
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
|
@ -144,15 +145,17 @@ public class TestRefreshHFilesEndpoint {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<Store> getStores() {
|
||||
List<Store> list = new ArrayList<Store>(stores.size());
|
||||
public List<HStore> getStores() {
|
||||
List<HStore> list = new ArrayList<>(stores.size());
|
||||
/**
|
||||
* This is used to trigger the custom definition (faulty)
|
||||
* of refresh HFiles API.
|
||||
*/
|
||||
try {
|
||||
if (this.store == null)
|
||||
store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY), this.conf);
|
||||
if (this.store == null) {
|
||||
store = new HStoreWithFaultyRefreshHFilesAPI(this,
|
||||
ColumnFamilyDescriptorBuilder.of(FAMILY), this.conf);
|
||||
}
|
||||
list.add(store);
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Couldn't instantiate custom store implementation", ioe);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.mapreduce.JobUtil;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -158,10 +160,13 @@ public class CompactionTool extends Configured implements Tool {
|
|||
store.triggerMajorCompaction();
|
||||
}
|
||||
do {
|
||||
CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
|
||||
if (compaction == null) break;
|
||||
Optional<CompactionContext> compaction =
|
||||
store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
|
||||
if (!compaction.isPresent()) {
|
||||
break;
|
||||
}
|
||||
List<StoreFile> storeFiles =
|
||||
store.compact(compaction, NoLimitThroughputController.INSTANCE);
|
||||
store.compact(compaction.get(), NoLimitThroughputController.INSTANCE);
|
||||
if (storeFiles != null && !storeFiles.isEmpty()) {
|
||||
if (keepCompactedFiles && deleteCompacted) {
|
||||
for (StoreFile storeFile: storeFiles) {
|
||||
|
|
|
@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
|
|||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -186,10 +186,10 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param c the environment provided by the region server
|
||||
* @param store the store where compaction is being requested
|
||||
* @param candidates the store files currently available for compaction
|
||||
* @param request custom compaction request
|
||||
* @param tracker tracker used to track the life cycle of a compaction
|
||||
*/
|
||||
default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
List<StoreFile> candidates, CompactionRequest request) throws IOException {}
|
||||
List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called after the {@link StoreFile}s to compact have been selected from the available
|
||||
|
@ -197,10 +197,10 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param c the environment provided by the region server
|
||||
* @param store the store being compacted
|
||||
* @param selected the store files selected to compact
|
||||
* @param request custom compaction request
|
||||
* @param tracker tracker used to track the life cycle of a compaction
|
||||
*/
|
||||
default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
ImmutableList<StoreFile> selected, CompactionRequest request) {}
|
||||
ImmutableList<StoreFile> selected, CompactionLifeCycleTracker tracker) {}
|
||||
|
||||
/**
|
||||
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
|
||||
|
@ -220,13 +220,13 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param store the store being compacted
|
||||
* @param scanner the scanner over existing data used in the store file rewriting
|
||||
* @param scanType type of Scan
|
||||
* @param request the requested compaction
|
||||
* @param tracker tracker used to track the life cycle of a compaction
|
||||
* @return the scanner to use during compaction. Should not be {@code null} unless the
|
||||
* implementation is writing new store files on its own.
|
||||
*/
|
||||
default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, InternalScanner scanner, ScanType scanType,
|
||||
CompactionRequest request) throws IOException {
|
||||
default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
|
||||
throws IOException {
|
||||
return scanner;
|
||||
}
|
||||
|
||||
|
@ -245,14 +245,14 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
|
||||
* files
|
||||
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
|
||||
* @param request compaction request
|
||||
* @param tracker used to track the life cycle of a compaction
|
||||
* @param readPoint the readpoint to create scanner
|
||||
* @return the scanner to use during compaction. {@code null} if the default implementation is to
|
||||
* be used.
|
||||
*/
|
||||
default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
|
||||
InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
|
||||
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -261,10 +261,10 @@ public interface RegionObserver extends Coprocessor {
|
|||
* @param c the environment provided by the region server
|
||||
* @param store the store being compacted
|
||||
* @param resultFile the new store file written out during compaction
|
||||
* @param request the requested compaction
|
||||
* @param tracker used to track the life cycle of a compaction
|
||||
*/
|
||||
default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
StoreFile resultFile, CompactionRequest request) throws IOException {}
|
||||
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called before the region is reported as closed to the master.
|
||||
|
@ -798,12 +798,12 @@ public interface RegionObserver extends Coprocessor {
|
|||
* Called before a store opens a new scanner.
|
||||
* This hook is called when a "user" scanner is opened.
|
||||
* <p>
|
||||
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext,
|
||||
* Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
|
||||
* to override scanners created for flushes or compactions, resp.
|
||||
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
|
||||
* and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
|
||||
* InternalScanner, CompactionLifeCycleTracker, long)} to override scanners created for flushes
|
||||
* or compactions, resp.
|
||||
* <p>
|
||||
* Call CoprocessorEnvironment#complete to skip any subsequent chained
|
||||
* coprocessors.
|
||||
* Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors.
|
||||
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
|
||||
* effect in this hook.
|
||||
* <p>
|
||||
|
|
|
@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
@ -41,24 +40,23 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
|||
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.StealJobQueue;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Compact region on request and then run split if appropriate
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CompactSplit implements CompactionRequestor, PropagatingConfigurationObserver {
|
||||
public class CompactSplit implements PropagatingConfigurationObserver {
|
||||
private static final Log LOG = LogFactory.getLog(CompactSplit.class);
|
||||
|
||||
// Configuration key for the large compaction threads.
|
||||
|
@ -233,127 +231,90 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why)
|
||||
throws IOException {
|
||||
return requestCompaction(r, why, null);
|
||||
public synchronized void requestCompaction(HRegion region, String why, int priority,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
requestCompactionInternal(region, why, priority, true, tracker, user);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
|
||||
List<Pair<CompactionRequest, Store>> requests) throws IOException {
|
||||
return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
|
||||
public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
requestCompactionInternal(region, store, why, priority, true, tracker, user);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
|
||||
final String why, CompactionRequest request) throws IOException {
|
||||
return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
|
||||
private void requestCompactionInternal(HRegion region, String why, int priority,
|
||||
boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
// request compaction on all stores
|
||||
for (HStore store : region.stores.values()) {
|
||||
requestCompactionInternal(region, store, why, priority, selectNow, tracker, user);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
|
||||
int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
|
||||
return requestCompactionInternal(r, why, p, requests, true, user);
|
||||
private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
|
||||
boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
if (this.server.isStopped() || (region.getTableDescriptor() != null &&
|
||||
!region.getTableDescriptor().isCompactionEnabled())) {
|
||||
return;
|
||||
}
|
||||
|
||||
private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
|
||||
int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
|
||||
throws IOException {
|
||||
// not a special compaction request, so make our own list
|
||||
List<CompactionRequest> ret = null;
|
||||
if (requests == null) {
|
||||
ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
|
||||
for (Store s : r.getStores()) {
|
||||
CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
|
||||
if (selectNow) ret.add(cr);
|
||||
Optional<CompactionContext> compaction;
|
||||
if (selectNow) {
|
||||
compaction = selectCompaction(region, store, priority, tracker, user);
|
||||
if (!compaction.isPresent()) {
|
||||
// message logged inside
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
|
||||
ret = new ArrayList<CompactionRequest>(requests.size());
|
||||
for (Pair<CompactionRequest, Store> pair : requests) {
|
||||
ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
compaction = Optional.empty();
|
||||
}
|
||||
|
||||
public CompactionRequest requestCompaction(final Region r, final Store s,
|
||||
final String why, int priority, CompactionRequest request, User user) throws IOException {
|
||||
return requestCompactionInternal(r, s, why, priority, request, true, user);
|
||||
}
|
||||
|
||||
public synchronized void requestSystemCompaction(
|
||||
final Region r, final String why) throws IOException {
|
||||
requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
|
||||
}
|
||||
|
||||
public void requestSystemCompaction(
|
||||
final Region r, final Store s, final String why) throws IOException {
|
||||
requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param r region store belongs to
|
||||
* @param s Store to request compaction on
|
||||
* @param why Why compaction requested -- used in debug messages
|
||||
* @param priority override the default priority (NO_PRIORITY == decide)
|
||||
* @param request custom compaction request. Can be <tt>null</tt> in which case a simple
|
||||
* compaction will be used.
|
||||
*/
|
||||
private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
|
||||
final String why, int priority, CompactionRequest request, boolean selectNow, User user)
|
||||
throws IOException {
|
||||
if (this.server.isStopped()
|
||||
|| (r.getTableDescriptor() != null && !r.getTableDescriptor().isCompactionEnabled())) {
|
||||
return null;
|
||||
}
|
||||
|
||||
CompactionContext compaction = null;
|
||||
if (selectNow) {
|
||||
compaction = selectCompaction(r, s, priority, request, user);
|
||||
if (compaction == null) return null; // message logged inside
|
||||
}
|
||||
|
||||
final RegionServerSpaceQuotaManager spaceQuotaManager =
|
||||
RegionServerSpaceQuotaManager spaceQuotaManager =
|
||||
this.server.getRegionServerSpaceQuotaManager();
|
||||
if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
|
||||
r.getTableDescriptor().getTableName())) {
|
||||
if (spaceQuotaManager != null &&
|
||||
spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation "
|
||||
+ " policy disallows compactions.");
|
||||
LOG.debug("Ignoring compaction request for " + region +
|
||||
" as an active space quota violation " + " policy disallows compactions.");
|
||||
}
|
||||
return null;
|
||||
return;
|
||||
}
|
||||
|
||||
ThreadPoolExecutor pool;
|
||||
if (selectNow) {
|
||||
// compaction.get is safe as we will just return if selectNow is true but no compaction is
|
||||
// selected
|
||||
pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions
|
||||
: shortCompactions;
|
||||
} else {
|
||||
// We assume that most compactions are small. So, put system compactions into small
|
||||
// pool; we will do selection there, and move to large pool if necessary.
|
||||
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
|
||||
? longCompactions : shortCompactions;
|
||||
pool.execute(new CompactionRunner(s, r, compaction, pool, user));
|
||||
((HRegion)r).incrementCompactionsQueuedCount();
|
||||
pool = shortCompactions;
|
||||
}
|
||||
pool.execute(new CompactionRunner(store, region, compaction, pool, user));
|
||||
region.incrementCompactionsQueuedCount();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String type = (pool == shortCompactions) ? "Small " : "Large ";
|
||||
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
|
||||
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
|
||||
}
|
||||
return selectNow ? compaction.getRequest() : null;
|
||||
}
|
||||
|
||||
private CompactionContext selectCompaction(final Region r, final Store s,
|
||||
int priority, CompactionRequest request, User user) throws IOException {
|
||||
CompactionContext compaction = s.requestCompaction(priority, request, user);
|
||||
if (compaction == null) {
|
||||
if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
|
||||
LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
|
||||
public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
|
||||
requestCompactionInternal(region, why, Store.NO_PRIORITY, false,
|
||||
CompactionLifeCycleTracker.DUMMY, null);
|
||||
}
|
||||
|
||||
public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
|
||||
throws IOException {
|
||||
requestCompactionInternal(region, store, why, Store.NO_PRIORITY, false,
|
||||
CompactionLifeCycleTracker.DUMMY, null);
|
||||
}
|
||||
|
||||
private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
|
||||
if (!compaction.isPresent() && LOG.isDebugEnabled() && region.getRegionInfo() != null) {
|
||||
LOG.debug("Not compacting " + region.getRegionInfo().getRegionNameAsString() +
|
||||
" because compaction request was cancelled");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
assert compaction.hasSelection();
|
||||
if (priority != Store.NO_PRIORITY) {
|
||||
compaction.getRequest().setPriority(priority);
|
||||
}
|
||||
return compaction;
|
||||
}
|
||||
|
||||
|
@ -468,33 +429,33 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
if (cmp != 0) {
|
||||
return cmp;
|
||||
}
|
||||
CompactionContext c1 = o1.compaction;
|
||||
CompactionContext c2 = o2.compaction;
|
||||
if (c1 == null) {
|
||||
return c2 == null ? 0 : 1;
|
||||
Optional<CompactionContext> c1 = o1.compaction;
|
||||
Optional<CompactionContext> c2 = o2.compaction;
|
||||
if (c1.isPresent()) {
|
||||
return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1;
|
||||
} else {
|
||||
return c2 == null ? -1 : compare(c1.getRequest(), c2.getRequest());
|
||||
return c2.isPresent() ? 1 : 0;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
private final class CompactionRunner implements Runnable {
|
||||
private final Store store;
|
||||
private final HStore store;
|
||||
private final HRegion region;
|
||||
private CompactionContext compaction;
|
||||
private final Optional<CompactionContext> compaction;
|
||||
private int queuedPriority;
|
||||
private ThreadPoolExecutor parent;
|
||||
private User user;
|
||||
private long time;
|
||||
|
||||
public CompactionRunner(Store store, Region region, CompactionContext compaction,
|
||||
public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction,
|
||||
ThreadPoolExecutor parent, User user) {
|
||||
super();
|
||||
this.store = store;
|
||||
this.region = (HRegion) region;
|
||||
this.region = region;
|
||||
this.compaction = compaction;
|
||||
this.queuedPriority =
|
||||
compaction == null ? store.getCompactPriority() : compaction.getRequest().getPriority();
|
||||
this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority()
|
||||
: store.getCompactPriority();
|
||||
this.parent = parent;
|
||||
this.user = user;
|
||||
this.time = System.currentTimeMillis();
|
||||
|
@ -502,14 +463,15 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return (this.compaction != null) ? ("Request = " + compaction.getRequest())
|
||||
: ("regionName = " + region.toString() + ", storeName = " + store.toString() +
|
||||
return compaction.map(c -> "Request = " + c.getRequest())
|
||||
.orElse("regionName = " + region.toString() + ", storeName = " + store.toString() +
|
||||
", priority = " + queuedPriority + ", time = " + time);
|
||||
}
|
||||
|
||||
private void doCompaction(User user) {
|
||||
CompactionContext c;
|
||||
// Common case - system compaction without a file selection. Select now.
|
||||
if (this.compaction == null) {
|
||||
if (!compaction.isPresent()) {
|
||||
int oldPriority = this.queuedPriority;
|
||||
this.queuedPriority = this.store.getCompactPriority();
|
||||
if (this.queuedPriority > oldPriority) {
|
||||
|
@ -518,44 +480,49 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
this.parent.execute(this);
|
||||
return;
|
||||
}
|
||||
Optional<CompactionContext> selected;
|
||||
try {
|
||||
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
|
||||
selected = selectCompaction(this.region, this.store, queuedPriority,
|
||||
CompactionLifeCycleTracker.DUMMY, user);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Compaction selection failed " + this, ex);
|
||||
server.checkFileSystem();
|
||||
region.decrementCompactionsQueuedCount();
|
||||
return;
|
||||
}
|
||||
if (this.compaction == null) {
|
||||
if (!selected.isPresent()) {
|
||||
region.decrementCompactionsQueuedCount();
|
||||
return; // nothing to do
|
||||
}
|
||||
c = selected.get();
|
||||
assert c.hasSelection();
|
||||
// Now see if we are in correct pool for the size; if not, go to the correct one.
|
||||
// We might end up waiting for a while, so cancel the selection.
|
||||
assert this.compaction.hasSelection();
|
||||
ThreadPoolExecutor pool = store.throttleCompaction(
|
||||
compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
|
||||
|
||||
ThreadPoolExecutor pool =
|
||||
store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
|
||||
|
||||
// Long compaction pool can process small job
|
||||
// Short compaction pool should not process large job
|
||||
if (this.parent == shortCompactions && pool == longCompactions) {
|
||||
this.store.cancelRequestedCompaction(this.compaction);
|
||||
this.compaction = null;
|
||||
this.store.cancelRequestedCompaction(c);
|
||||
this.parent = pool;
|
||||
this.parent.execute(this);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
c = compaction.get();
|
||||
}
|
||||
// Finally we can compact something.
|
||||
assert this.compaction != null;
|
||||
assert c != null;
|
||||
|
||||
this.compaction.getRequest().beforeExecute();
|
||||
c.getRequest().getTracker().beforeExecute(store);
|
||||
try {
|
||||
// Note: please don't put single-compaction logic here;
|
||||
// put it into region/store/etc. This is CST logic.
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
boolean completed =
|
||||
region.compact(compaction, store, compactionThroughputController, user);
|
||||
region.compact(c, store, compactionThroughputController, user);
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
|
||||
this + "; duration=" + StringUtils.formatTimeDiff(now, start));
|
||||
|
@ -582,10 +549,10 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
region.reportCompactionRequestFailure();
|
||||
server.checkFileSystem();
|
||||
} finally {
|
||||
c.getRequest().getTracker().afterExecute(store);
|
||||
region.decrementCompactionsQueuedCount();
|
||||
LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
|
||||
}
|
||||
this.compaction.getRequest().afterExecute();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -615,9 +582,9 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
|||
@Override
|
||||
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
|
||||
if (runnable instanceof CompactionRunner) {
|
||||
CompactionRunner runner = (CompactionRunner)runnable;
|
||||
CompactionRunner runner = (CompactionRunner) runnable;
|
||||
LOG.debug("Compaction Rejected: " + runner);
|
||||
runner.store.cancelRequestedCompaction(runner.compaction);
|
||||
runner.compaction.ifPresent(c -> runner.store.cancelRequestedCompaction(c));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public interface CompactionRequestor {
|
||||
/**
|
||||
* @param r Region to compact
|
||||
* @param why Why compaction was requested -- used in debug messages
|
||||
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
|
||||
* compactions were started
|
||||
* @throws IOException
|
||||
*/
|
||||
List<CompactionRequest> requestCompaction(final Region r, final String why)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* @param r Region to compact
|
||||
* @param why Why compaction was requested -- used in debug messages
|
||||
* @param requests custom compaction requests. Each compaction must specify the store on which it
|
||||
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
|
||||
* stores for the region.
|
||||
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
|
||||
* compactions were started
|
||||
* @throws IOException
|
||||
*/
|
||||
List<CompactionRequest> requestCompaction(
|
||||
final Region r, final String why, List<Pair<CompactionRequest, Store>> requests
|
||||
)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* @param r Region to compact
|
||||
* @param s Store within region to compact
|
||||
* @param why Why compaction was requested -- used in debug messages
|
||||
* @param request custom compaction request for the {@link Region} and {@link Store}. Custom
|
||||
* request must be <tt>null</tt> or be constructed with matching region and store.
|
||||
* @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started.
|
||||
* @throws IOException
|
||||
*/
|
||||
CompactionRequest requestCompaction(
|
||||
final Region r, final Store s, final String why, CompactionRequest request
|
||||
) throws IOException;
|
||||
|
||||
/**
|
||||
* @param r Region to compact
|
||||
* @param why Why compaction was requested -- used in debug messages
|
||||
* @param pri Priority of this compaction. minHeap. <=0 is critical
|
||||
* @param requests custom compaction requests. Each compaction must specify the store on which it
|
||||
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
|
||||
* stores for the region.
|
||||
* @param user the effective user
|
||||
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
|
||||
* compactions were started.
|
||||
* @throws IOException
|
||||
*/
|
||||
List<CompactionRequest> requestCompaction(
|
||||
final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests,
|
||||
User user
|
||||
) throws IOException;
|
||||
|
||||
/**
|
||||
* @param r Region to compact
|
||||
* @param s Store within region to compact
|
||||
* @param why Why compaction was requested -- used in debug messages
|
||||
* @param pri Priority of this compaction. minHeap. <=0 is critical
|
||||
* @param request custom compaction request to run. {@link Store} and {@link Region} for the
|
||||
* request must match the region and store specified here.
|
||||
* @param user
|
||||
* @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started
|
||||
* @throws IOException
|
||||
*/
|
||||
CompactionRequest requestCompaction(
|
||||
final Region r, final Store s, final String why, int pri, CompactionRequest request, User user
|
||||
) throws IOException;
|
||||
}
|
|
@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
* enough, then all stores will be flushed.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
|
||||
public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FlushAllLargeStoresPolicy.class);
|
||||
|
||||
|
@ -48,20 +48,22 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
|
|||
}
|
||||
|
||||
@Override
|
||||
public Collection<Store> selectStoresToFlush() {
|
||||
public Collection<HStore> selectStoresToFlush() {
|
||||
// no need to select stores if only one family
|
||||
if (region.getTableDescriptor().getColumnFamilyCount() == 1) {
|
||||
return region.stores.values();
|
||||
}
|
||||
// start selection
|
||||
Collection<Store> stores = region.stores.values();
|
||||
Set<Store> specificStoresToFlush = new HashSet<>();
|
||||
for (Store store : stores) {
|
||||
Collection<HStore> stores = region.stores.values();
|
||||
Set<HStore> specificStoresToFlush = new HashSet<>();
|
||||
for (HStore store : stores) {
|
||||
if (shouldFlush(store)) {
|
||||
specificStoresToFlush.add(store);
|
||||
}
|
||||
}
|
||||
if (!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
|
||||
if (!specificStoresToFlush.isEmpty()) {
|
||||
return specificStoresToFlush;
|
||||
}
|
||||
|
||||
// Didn't find any CFs which were above the threshold for selection.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -71,8 +73,8 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldFlush(Store store) {
|
||||
return (super.shouldFlush(store) || region.shouldFlushStore(store));
|
||||
protected boolean shouldFlush(HStore store) {
|
||||
return super.shouldFlush(store) || region.shouldFlushStore(store);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
public class FlushAllStoresPolicy extends FlushPolicy {
|
||||
|
||||
@Override
|
||||
public Collection<Store> selectStoresToFlush() {
|
||||
public Collection<HStore> selectStoresToFlush() {
|
||||
return region.stores.values();
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy {
|
|||
return flushSizeLowerBound;
|
||||
}
|
||||
|
||||
protected boolean shouldFlush(Store store) {
|
||||
protected boolean shouldFlush(HStore store) {
|
||||
if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +
|
||||
|
|
|
@ -32,26 +32,31 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
|
||||
|
||||
private Collection<Store> regularStores = new HashSet<>();
|
||||
private Collection<Store> sloppyStores = new HashSet<>();
|
||||
private Collection<HStore> regularStores = new HashSet<>();
|
||||
private Collection<HStore> sloppyStores = new HashSet<>();
|
||||
|
||||
/**
|
||||
* @return the stores need to be flushed.
|
||||
*/
|
||||
@Override public Collection<Store> selectStoresToFlush() {
|
||||
Collection<Store> specificStoresToFlush = new HashSet<>();
|
||||
for(Store store : regularStores) {
|
||||
if(shouldFlush(store) || region.shouldFlushStore(store)) {
|
||||
@Override
|
||||
public Collection<HStore> selectStoresToFlush() {
|
||||
Collection<HStore> specificStoresToFlush = new HashSet<>();
|
||||
for (HStore store : regularStores) {
|
||||
if (shouldFlush(store) || region.shouldFlushStore(store)) {
|
||||
specificStoresToFlush.add(store);
|
||||
}
|
||||
}
|
||||
if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
|
||||
for(Store store : sloppyStores) {
|
||||
if(shouldFlush(store)) {
|
||||
if (!specificStoresToFlush.isEmpty()) {
|
||||
return specificStoresToFlush;
|
||||
}
|
||||
for (HStore store : sloppyStores) {
|
||||
if (shouldFlush(store)) {
|
||||
specificStoresToFlush.add(store);
|
||||
}
|
||||
}
|
||||
if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
|
||||
if (!specificStoresToFlush.isEmpty()) {
|
||||
return specificStoresToFlush;
|
||||
}
|
||||
return region.stores.values();
|
||||
}
|
||||
|
||||
|
@ -59,8 +64,8 @@ public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
|
|||
protected void configureForRegion(HRegion region) {
|
||||
super.configureForRegion(region);
|
||||
this.flushSizeLowerBound = getFlushSizeLowerBound(region);
|
||||
for(Store store : region.stores.values()) {
|
||||
if(store.isSloppyMemstore()) {
|
||||
for (HStore store : region.stores.values()) {
|
||||
if (store.isSloppyMemstore()) {
|
||||
sloppyStores.add(store);
|
||||
} else {
|
||||
regularStores.add(store);
|
||||
|
|
|
@ -44,6 +44,6 @@ public abstract class FlushPolicy extends Configured {
|
|||
/**
|
||||
* @return the stores need to be flushed.
|
||||
*/
|
||||
public abstract Collection<Store> selectStoresToFlush();
|
||||
public abstract Collection<HStore> selectStoresToFlush();
|
||||
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -56,8 +56,8 @@ import javax.management.MalformedObjectNameException;
|
|||
import javax.management.ObjectName;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
|||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
|
||||
|
@ -140,6 +141,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
|||
import org.apache.hadoop.hbase.security.Superusers;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||
|
@ -210,10 +214,6 @@ import org.apache.zookeeper.KeeperException;
|
|||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
|
||||
|
||||
import sun.misc.Signal;
|
||||
import sun.misc.SignalHandler;
|
||||
|
||||
|
@ -1686,7 +1686,7 @@ public class HRegionServer extends HasThread implements
|
|||
int totalStaticBloomSizeKB = 0;
|
||||
long totalCompactingKVs = 0;
|
||||
long currentCompactedKVs = 0;
|
||||
List<Store> storeList = r.getStores();
|
||||
List<? extends Store> storeList = r.getStores();
|
||||
stores += storeList.size();
|
||||
for (Store store : storeList) {
|
||||
storefiles += store.getStorefilesCount();
|
||||
|
@ -1772,27 +1772,32 @@ public class HRegionServer extends HasThread implements
|
|||
@Override
|
||||
protected void chore() {
|
||||
for (Region r : this.instance.onlineRegions.values()) {
|
||||
if (r == null)
|
||||
if (r == null) {
|
||||
continue;
|
||||
for (Store s : r.getStores()) {
|
||||
}
|
||||
HRegion hr = (HRegion) r;
|
||||
for (HStore s : hr.stores.values()) {
|
||||
try {
|
||||
long multiplier = s.getCompactionCheckMultiplier();
|
||||
assert multiplier > 0;
|
||||
if (iteration % multiplier != 0) continue;
|
||||
if (iteration % multiplier != 0) {
|
||||
continue;
|
||||
}
|
||||
if (s.needsCompaction()) {
|
||||
// Queue a compaction. Will recognize if major is needed.
|
||||
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
|
||||
+ " requests compaction");
|
||||
this.instance.compactSplitThread.requestSystemCompaction(hr, s,
|
||||
getName() + " requests compaction");
|
||||
} else if (s.isMajorCompaction()) {
|
||||
s.triggerMajorCompaction();
|
||||
if (majorCompactPriority == DEFAULT_PRIORITY
|
||||
|| majorCompactPriority > ((HRegion)r).getCompactPriority()) {
|
||||
this.instance.compactSplitThread.requestCompaction(r, s, getName()
|
||||
+ " requests major compaction; use default priority", null);
|
||||
if (majorCompactPriority == DEFAULT_PRIORITY ||
|
||||
majorCompactPriority > hr.getCompactPriority()) {
|
||||
this.instance.compactSplitThread.requestCompaction(hr, s,
|
||||
getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
|
||||
CompactionLifeCycleTracker.DUMMY, null);
|
||||
} else {
|
||||
this.instance.compactSplitThread.requestCompaction(r, s, getName()
|
||||
+ " requests major compaction; use configured priority",
|
||||
this.majorCompactPriority, null, null);
|
||||
this.instance.compactSplitThread.requestCompaction(hr, s,
|
||||
getName() + " requests major compaction; use configured priority",
|
||||
this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -2146,13 +2151,12 @@ public class HRegionServer extends HasThread implements
|
|||
@Override
|
||||
public void postOpenDeployTasks(final PostOpenDeployContext context)
|
||||
throws KeeperException, IOException {
|
||||
Region r = context.getRegion();
|
||||
HRegion r = (HRegion) context.getRegion();
|
||||
long masterSystemTime = context.getMasterSystemTime();
|
||||
Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
|
||||
rpcServices.checkOpen();
|
||||
LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
|
||||
// Do checks to see if we need to compact (references or too many files)
|
||||
for (Store s : r.getStores()) {
|
||||
for (HStore s : r.stores.values()) {
|
||||
if (s.hasReferences() || s.needsCompaction()) {
|
||||
this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
|
||||
}
|
||||
|
@ -2863,11 +2867,6 @@ public class HRegionServer extends HasThread implements
|
|||
return serverName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionRequestor getCompactionRequester() {
|
||||
return this.compactSplitThread;
|
||||
}
|
||||
|
||||
public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
|
||||
return this.rsHost;
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
|
@ -52,13 +53,12 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.FailedArchiveException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
|
@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
|||
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
|
@ -82,8 +83,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
|||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -92,14 +91,16 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||
|
||||
/**
|
||||
* A Store holds a column family in a Region. Its a memstore and a set of zero
|
||||
|
@ -477,7 +478,7 @@ public class HStore implements Store {
|
|||
/**
|
||||
* @param tabledir {@link Path} to where the table is being stored
|
||||
* @param hri {@link HRegionInfo} for the region.
|
||||
* @param family {@link HColumnDescriptor} describing the column family
|
||||
* @param family {@link ColumnFamilyDescriptor} describing the column family
|
||||
* @return Path to family/Store home directory.
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -489,7 +490,7 @@ public class HStore implements Store {
|
|||
/**
|
||||
* @param tabledir {@link Path} to where the table is being stored
|
||||
* @param encodedName Encoded region name.
|
||||
* @param family {@link HColumnDescriptor} describing the column family
|
||||
* @param family {@link ColumnFamilyDescriptor} describing the column family
|
||||
* @return Path to family/Store home directory.
|
||||
*/
|
||||
@Deprecated
|
||||
|
@ -1386,15 +1387,14 @@ public class HStore implements Store {
|
|||
}
|
||||
}
|
||||
|
||||
private List<StoreFile> moveCompatedFilesIntoPlace(
|
||||
final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
|
||||
private List<StoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles,
|
||||
User user) throws IOException {
|
||||
List<StoreFile> sfs = new ArrayList<>(newFiles.size());
|
||||
for (Path newFile : newFiles) {
|
||||
assert newFile != null;
|
||||
final StoreFile sf = moveFileIntoPlace(newFile);
|
||||
StoreFile sf = moveFileIntoPlace(newFile);
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
final Store thisStore = this;
|
||||
getCoprocessorHost().postCompact(thisStore, sf, cr, user);
|
||||
getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user);
|
||||
}
|
||||
assert sf != null;
|
||||
sfs.add(sf);
|
||||
|
@ -1636,23 +1636,12 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompactionContext requestCompaction() throws IOException {
|
||||
return requestCompaction(Store.NO_PRIORITY, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
|
||||
throws IOException {
|
||||
return requestCompaction(priority, baseRequest, null);
|
||||
}
|
||||
@Override
|
||||
public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
|
||||
User user) throws IOException {
|
||||
public Optional<CompactionContext> requestCompaction(int priority,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
// don't even select for compaction if writes are disabled
|
||||
if (!this.areWritesEnabled()) {
|
||||
return null;
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
// Before we do compaction, try to get rid of unneeded files to simplify things.
|
||||
removeUnneededFiles();
|
||||
|
||||
|
@ -1666,7 +1655,7 @@ public class HStore implements Store {
|
|||
final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
|
||||
boolean override = false;
|
||||
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
|
||||
baseRequest, user);
|
||||
tracker, user);
|
||||
if (override) {
|
||||
// Coprocessor is overriding normal file selection.
|
||||
compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
|
||||
|
@ -1695,21 +1684,13 @@ public class HStore implements Store {
|
|||
}
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
this.getCoprocessorHost().postCompactSelection(
|
||||
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest, user);
|
||||
}
|
||||
|
||||
// Selected files; see if we have a compaction with some custom base request.
|
||||
if (baseRequest != null) {
|
||||
// Update the request with what the system thinks the request should be;
|
||||
// its up to the request if it wants to listen.
|
||||
compaction.forceSelect(
|
||||
baseRequest.combineWith(compaction.getRequest()));
|
||||
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user);
|
||||
}
|
||||
// Finally, we have the resulting files list. Check if we have any files at all.
|
||||
request = compaction.getRequest();
|
||||
final Collection<StoreFile> selectedFiles = request.getFiles();
|
||||
Collection<StoreFile> selectedFiles = request.getFiles();
|
||||
if (selectedFiles.isEmpty()) {
|
||||
return null;
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
addToCompactingFiles(selectedFiles);
|
||||
|
@ -1721,6 +1702,7 @@ public class HStore implements Store {
|
|||
// Set priority, either override value supplied by caller or from store.
|
||||
request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
|
||||
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
|
||||
request.setTracker(tracker);
|
||||
}
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
|
@ -1730,7 +1712,7 @@ public class HStore implements Store {
|
|||
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
|
||||
+ (request.isAllFiles() ? " (all files)" : ""));
|
||||
this.region.reportCompactionRequestStart(request.isMajor());
|
||||
return compaction;
|
||||
return Optional.of(compaction);
|
||||
}
|
||||
|
||||
/** Adds the files to compacting files. filesCompacting must be locked. */
|
||||
|
|
|
@ -20,11 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import static org.apache.hadoop.util.StringUtils.humanReadableInt;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.lang.management.MemoryType;
|
||||
import java.util.ArrayList;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.HashMap;
|
||||
|
@ -50,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
|
@ -448,8 +446,8 @@ class MemStoreFlusher implements FlushRequester {
|
|||
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
|
||||
if (!this.server.compactSplitThread.requestSplit(region)) {
|
||||
try {
|
||||
this.server.compactSplitThread.requestSystemCompaction(
|
||||
region, Thread.currentThread().getName());
|
||||
this.server.compactSplitThread.requestSystemCompaction((HRegion) region,
|
||||
Thread.currentThread().getName());
|
||||
} catch (IOException e) {
|
||||
e = e instanceof RemoteException ?
|
||||
((RemoteException)e).unwrapRemoteException() : e;
|
||||
|
@ -503,8 +501,8 @@ class MemStoreFlusher implements FlushRequester {
|
|||
if (shouldSplit) {
|
||||
this.server.compactSplitThread.requestSplit(region);
|
||||
} else if (shouldCompact) {
|
||||
server.compactSplitThread.requestSystemCompaction(
|
||||
region, Thread.currentThread().getName());
|
||||
server.compactSplitThread.requestSystemCompaction((HRegion) region,
|
||||
Thread.currentThread().getName());
|
||||
}
|
||||
} catch (DroppedSnapshotException ex) {
|
||||
// Cache flush can fail in a few places. If it fails in a critical
|
||||
|
|
|
@ -761,7 +761,7 @@ class MetricsRegionServerWrapperImpl
|
|||
tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed();
|
||||
tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed();
|
||||
tempBlockedRequestsCount += r.getBlockedRequestsCount();
|
||||
List<Store> storeList = r.getStores();
|
||||
List<? extends Store> storeList = r.getStores();
|
||||
tempNumStores += storeList.size();
|
||||
for (Store store : storeList) {
|
||||
tempNumStoreFiles += store.getStorefilesCount();
|
||||
|
|
|
@ -95,7 +95,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
|
||||
@Override
|
||||
public long getNumStores() {
|
||||
Map<byte[],Store> stores = this.region.stores;
|
||||
Map<byte[], HStore> stores = this.region.stores;
|
||||
if (stores == null) {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.Lease;
|
|||
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||
|
@ -1538,7 +1539,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
try {
|
||||
checkOpen();
|
||||
requestCount.increment();
|
||||
Region region = getRegion(request.getRegion());
|
||||
HRegion region = (HRegion) getRegion(request.getRegion());
|
||||
// Quota support is enabled, the requesting user is not system/super user
|
||||
// and a quota policy is enforced that disables compactions.
|
||||
if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
|
||||
|
@ -1552,7 +1553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
|
||||
boolean major = false;
|
||||
byte [] family = null;
|
||||
Store store = null;
|
||||
HStore store = null;
|
||||
if (request.hasFamily()) {
|
||||
family = request.getFamily().toByteArray();
|
||||
store = region.getStore(family);
|
||||
|
@ -1579,12 +1580,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
+ region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
|
||||
}
|
||||
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
|
||||
if(family != null) {
|
||||
regionServer.compactSplitThread.requestCompaction(region, store, log,
|
||||
Store.PRIORITY_USER, null, RpcServer.getRequestUser());
|
||||
if (family != null) {
|
||||
regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER,
|
||||
CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
|
||||
} else {
|
||||
regionServer.compactSplitThread.requestCompaction(region, log,
|
||||
Store.PRIORITY_USER, null, RpcServer.getRequestUser());
|
||||
regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER,
|
||||
CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
|
||||
}
|
||||
return CompactRegionResponse.newBuilder().build();
|
||||
} catch (IOException ie) {
|
||||
|
@ -1606,7 +1607,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
try {
|
||||
checkOpen();
|
||||
requestCount.increment();
|
||||
Region region = getRegion(request.getRegion());
|
||||
HRegion region = (HRegion) getRegion(request.getRegion());
|
||||
LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
|
||||
boolean shouldFlush = true;
|
||||
if (request.hasIfOlderThanTs()) {
|
||||
|
@ -1617,8 +1618,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
|
||||
request.getWriteFlushWalMarker() : false;
|
||||
// Go behind the curtain so we can manage writing of the flush WAL marker
|
||||
HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
|
||||
((HRegion)region).flushcache(true, writeFlushWalMarker);
|
||||
HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker);
|
||||
boolean compactionNeeded = flushResult.isCompactionNeeded();
|
||||
if (compactionNeeded) {
|
||||
regionServer.compactSplitThread.requestSystemCompaction(region,
|
||||
|
|
|
@ -23,10 +23,12 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -41,13 +43,17 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
|||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||
|
||||
/**
|
||||
* Regions store data for a certain region of a table. It stores all columns
|
||||
|
@ -105,7 +111,7 @@ public interface Region extends ConfigurationObserver {
|
|||
* <p>Use with caution. Exposed for use of fixup utilities.
|
||||
* @return a list of the Stores managed by this region
|
||||
*/
|
||||
List<Store> getStores();
|
||||
List<? extends Store> getStores();
|
||||
|
||||
/**
|
||||
* Return the Store for the given family
|
||||
|
@ -115,7 +121,7 @@ public interface Region extends ConfigurationObserver {
|
|||
Store getStore(byte[] family);
|
||||
|
||||
/** @return list of store file names for the given families */
|
||||
List<String> getStoreFileList(byte [][] columns);
|
||||
List<String> getStoreFileList(byte[][] columns);
|
||||
|
||||
/**
|
||||
* Check the region's underlying store files, open the files that have not
|
||||
|
@ -753,6 +759,18 @@ public interface Region extends ConfigurationObserver {
|
|||
*/
|
||||
CompactionState getCompactionState();
|
||||
|
||||
/**
|
||||
* Request compaction on this region.
|
||||
*/
|
||||
void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker, User user)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Request compaction for the given family
|
||||
*/
|
||||
void requestCompaction(byte[] family, String why, int priority,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException;
|
||||
|
||||
/** Wait for all current flushes and compactions of the region to complete */
|
||||
void waitForFlushesAndCompactions();
|
||||
|
||||
|
|
|
@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.metrics.MetricRegistry;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
@ -499,18 +500,18 @@ public class RegionCoprocessorHost
|
|||
/**
|
||||
* See
|
||||
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
|
||||
* InternalScanner, CompactionRequest, long)}
|
||||
* InternalScanner, CompactionLifeCycleTracker, long)}
|
||||
*/
|
||||
public InternalScanner preCompactScannerOpen(final Store store,
|
||||
final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
|
||||
final CompactionRequest request, final User user, final long readPoint) throws IOException {
|
||||
public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
|
||||
ScanType scanType, long earliestPutTs, CompactionLifeCycleTracker tracker, User user,
|
||||
long readPoint) throws IOException {
|
||||
return execOperationWithResult(null,
|
||||
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
|
||||
@Override
|
||||
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
|
||||
earliestPutTs, getResult(), request, readPoint));
|
||||
earliestPutTs, getResult(), tracker, readPoint));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -520,17 +521,17 @@ public class RegionCoprocessorHost
|
|||
* available candidates.
|
||||
* @param store The store where compaction is being requested
|
||||
* @param candidates The currently available store files
|
||||
* @param request custom compaction request
|
||||
* @param tracker used to track the life cycle of a compaction
|
||||
* @return If {@code true}, skip the normal selection process and use the current list
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
|
||||
final CompactionRequest request, final User user) throws IOException {
|
||||
public boolean preCompactSelection(Store store, List<StoreFile> candidates,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
|
||||
@Override
|
||||
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
oserver.preCompactSelection(ctx, store, candidates, request);
|
||||
oserver.preCompactSelection(ctx, store, candidates, tracker);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -540,21 +541,17 @@ public class RegionCoprocessorHost
|
|||
* candidates.
|
||||
* @param store The store where compaction is being requested
|
||||
* @param selected The store files selected to compact
|
||||
* @param request custom compaction
|
||||
* @param tracker used to track the life cycle of a compaction
|
||||
*/
|
||||
public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
|
||||
final CompactionRequest request, final User user) {
|
||||
try {
|
||||
public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
|
||||
@Override
|
||||
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
oserver.postCompactSelection(ctx, store, selected, request);
|
||||
oserver.postCompactSelection(ctx, store, selected, tracker);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
LOG.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -562,18 +559,17 @@ public class RegionCoprocessorHost
|
|||
* @param store the store being compacted
|
||||
* @param scanner the scanner used to read store data during compaction
|
||||
* @param scanType type of Scan
|
||||
* @param request the compaction that will be executed
|
||||
* @param tracker used to track the life cycle of a compaction
|
||||
* @throws IOException
|
||||
*/
|
||||
public InternalScanner preCompact(final Store store, final InternalScanner scanner,
|
||||
final ScanType scanType, final CompactionRequest request, final User user)
|
||||
throws IOException {
|
||||
public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
|
||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||
return execOperationWithResult(false, scanner,
|
||||
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
|
||||
@Override
|
||||
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
|
||||
setResult(oserver.preCompact(ctx, store, getResult(), scanType, tracker));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -582,16 +578,16 @@ public class RegionCoprocessorHost
|
|||
* Called after the store compaction has completed.
|
||||
* @param store the store being compacted
|
||||
* @param resultFile the new store file written during compaction
|
||||
* @param request the compaction that is being executed
|
||||
* @param tracker used to track the life cycle of a compaction
|
||||
* @throws IOException
|
||||
*/
|
||||
public void postCompact(final Store store, final StoreFile resultFile,
|
||||
final CompactionRequest request, final User user) throws IOException {
|
||||
public void postCompact(Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
|
||||
User user) throws IOException {
|
||||
execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
|
||||
@Override
|
||||
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
|
||||
throws IOException {
|
||||
oserver.postCompact(ctx, store, resultFile, request);
|
||||
oserver.postCompact(ctx, store, resultFile, tracker);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -63,11 +63,6 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
|
|||
*/
|
||||
List<WAL> getWALs() throws IOException;
|
||||
|
||||
/**
|
||||
* @return Implementation of {@link CompactionRequestor} or null.
|
||||
*/
|
||||
CompactionRequestor getCompactionRequester();
|
||||
|
||||
/**
|
||||
* @return Implementation of {@link FlushRequester} or null.
|
||||
*/
|
||||
|
|
|
@ -76,7 +76,7 @@ public abstract class RegionSplitPolicy extends Configured {
|
|||
if (explicitSplitPoint != null) {
|
||||
return explicitSplitPoint;
|
||||
}
|
||||
List<Store> stores = region.getStores();
|
||||
List<HStore> stores = region.getStores();
|
||||
|
||||
byte[] splitPointFromLargestStore = null;
|
||||
long largestStoreSize = 0;
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||
|
@ -51,7 +53,8 @@ import org.apache.hadoop.hbase.security.User;
|
|||
@InterfaceStability.Evolving
|
||||
public interface Store extends HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
|
||||
|
||||
/* The default priority for user-specified compaction requests.
|
||||
/**
|
||||
* The default priority for user-specified compaction requests.
|
||||
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
|
||||
*/
|
||||
int PRIORITY_USER = 1;
|
||||
|
@ -253,17 +256,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
|||
*/
|
||||
CompactionProgress getCompactionProgress();
|
||||
|
||||
CompactionContext requestCompaction() throws IOException;
|
||||
default Optional<CompactionContext> requestCompaction() throws IOException {
|
||||
return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated see requestCompaction(int, CompactionRequest, User)
|
||||
*/
|
||||
@Deprecated
|
||||
CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
|
||||
throws IOException;
|
||||
|
||||
CompactionContext requestCompaction(int priority, CompactionRequest baseRequest, User user)
|
||||
throws IOException;
|
||||
Optional<CompactionContext> requestCompaction(int priority, CompactionLifeCycleTracker tracker,
|
||||
User user) throws IOException;
|
||||
|
||||
void cancelRequestedCompaction(CompactionContext compaction);
|
||||
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Used to track compaction execution.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
||||
@InterfaceStability.Evolving
|
||||
public interface CompactionLifeCycleTracker {
|
||||
|
||||
static CompactionLifeCycleTracker DUMMY = new CompactionLifeCycleTracker() {
|
||||
};
|
||||
|
||||
/**
|
||||
* Called before compaction is executed by CompactSplitThread.
|
||||
* <p>
|
||||
* Requesting compaction on a region can lead to multiple compactions on different stores, so we
|
||||
* will pass the {@link Store} in to tell you the store we operate on.
|
||||
*/
|
||||
default void beforeExecute(Store store) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after compaction is executed by CompactSplitThread.
|
||||
* <p>
|
||||
* Requesting compaction on a region can lead to multiple compactions on different stores, so we
|
||||
* will pass the {@link Store} in to tell you the store we operate on.
|
||||
*/
|
||||
default void afterExecute(Store store) {
|
||||
}
|
||||
}
|
|
@ -18,25 +18,21 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* This class holds all logical details necessary to run a compaction.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({ "coprocessor" })
|
||||
@InterfaceStability.Evolving
|
||||
@InterfaceAudience.Private
|
||||
public class CompactionRequest {
|
||||
|
||||
// was this compaction promoted to an off-peak
|
||||
|
@ -53,58 +49,20 @@ public class CompactionRequest {
|
|||
private String regionName = "";
|
||||
private String storeName = "";
|
||||
private long totalSize = -1L;
|
||||
|
||||
/**
|
||||
* This ctor should be used by coprocessors that want to subclass CompactionRequest.
|
||||
*/
|
||||
public CompactionRequest() {
|
||||
this.selectionTime = EnvironmentEdgeManager.currentTime();
|
||||
this.timeInNanos = System.nanoTime();
|
||||
}
|
||||
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
|
||||
|
||||
public CompactionRequest(Collection<StoreFile> files) {
|
||||
this();
|
||||
Preconditions.checkNotNull(files);
|
||||
this.filesToCompact = files;
|
||||
this.selectionTime = EnvironmentEdgeManager.currentTime();
|
||||
this.timeInNanos = System.nanoTime();
|
||||
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
|
||||
recalculateSize();
|
||||
}
|
||||
|
||||
public void updateFiles(Collection<StoreFile> files) {
|
||||
this.filesToCompact = files;
|
||||
this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
|
||||
recalculateSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
|
||||
*/
|
||||
public void beforeExecute() {}
|
||||
|
||||
/**
|
||||
* Called after compaction is executed by CompactSplitThread; for use by coproc subclasses.
|
||||
*/
|
||||
public void afterExecute() {}
|
||||
|
||||
/**
|
||||
* Combines the request with other request. Coprocessors subclassing CR may override
|
||||
* this if they want to do clever things based on CompactionPolicy selection that
|
||||
* is passed to this method via "other". The default implementation just does a copy.
|
||||
* @param other Request to combine with.
|
||||
* @return The result (may be "this" or "other").
|
||||
*/
|
||||
public CompactionRequest combineWith(CompactionRequest other) {
|
||||
this.filesToCompact = new ArrayList<>(other.getFiles());
|
||||
this.isOffPeak = other.isOffPeak;
|
||||
this.isMajor = other.isMajor;
|
||||
this.priority = other.priority;
|
||||
this.selectionTime = other.selectionTime;
|
||||
this.timeInNanos = other.timeInNanos;
|
||||
this.regionName = other.regionName;
|
||||
this.storeName = other.storeName;
|
||||
this.totalSize = other.totalSize;
|
||||
recalculateSize();
|
||||
return this;
|
||||
}
|
||||
|
||||
public Collection<StoreFile> getFiles() {
|
||||
return this.filesToCompact;
|
||||
}
|
||||
|
@ -168,6 +126,14 @@ public class CompactionRequest {
|
|||
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
|
||||
}
|
||||
|
||||
public void setTracker(CompactionLifeCycleTracker tracker) {
|
||||
this.tracker = tracker;
|
||||
}
|
||||
|
||||
public CompactionLifeCycleTracker getTracker() {
|
||||
return tracker;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String fsList = filesToCompact.stream().filter(f -> f.getReader() != null)
|
||||
|
@ -186,12 +152,7 @@ public class CompactionRequest {
|
|||
* @param files files that should be included in the compaction
|
||||
*/
|
||||
private void recalculateSize() {
|
||||
long sz = 0;
|
||||
for (StoreFile sf : this.filesToCompact) {
|
||||
StoreFileReader r = sf.getReader();
|
||||
sz += r == null ? 0 : r.length();
|
||||
}
|
||||
this.totalSize = sz;
|
||||
this.totalSize = filesToCompact.stream().map(StoreFile::getReader)
|
||||
.mapToLong(r -> r != null ? r.length() : 0L).sum();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -338,14 +338,14 @@ public abstract class Compactor<T extends CellSink> {
|
|||
* @param readPoint the read point to help create scanner by Coprocessor if required.
|
||||
* @return Scanner override by coprocessor; null if not overriding.
|
||||
*/
|
||||
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
|
||||
final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
|
||||
User user, final long readPoint) throws IOException {
|
||||
protected InternalScanner preCreateCoprocScanner(CompactionRequest request, ScanType scanType,
|
||||
long earliestPutTs, List<StoreFileScanner> scanners, User user, long readPoint)
|
||||
throws IOException {
|
||||
if (store.getCoprocessorHost() == null) {
|
||||
return null;
|
||||
}
|
||||
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
|
||||
earliestPutTs, request, user, readPoint);
|
||||
earliestPutTs, request.getTracker(), user, readPoint);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -355,12 +355,13 @@ public abstract class Compactor<T extends CellSink> {
|
|||
* @param scanner The default scanner created for compaction.
|
||||
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
|
||||
*/
|
||||
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
|
||||
final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
|
||||
protected InternalScanner postCreateCoprocScanner(CompactionRequest request, ScanType scanType,
|
||||
InternalScanner scanner, User user) throws IOException {
|
||||
if (store.getCoprocessorHost() == null) {
|
||||
return scanner;
|
||||
}
|
||||
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request, user);
|
||||
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
|
||||
user);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
|||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
|
@ -1532,7 +1533,8 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
|
|||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException {
|
||||
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
|
||||
throws IOException {
|
||||
requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null,
|
||||
Action.ADMIN, Action.CREATE);
|
||||
return scanner;
|
||||
|
|
|
@ -92,7 +92,7 @@
|
|||
</div>
|
||||
|
||||
<% if(region != null) { //
|
||||
List<Store> stores = region.getStores();
|
||||
List<? extends Store> stores = region.getStores();
|
||||
for (Store store : stores) {
|
||||
String cf = store.getColumnFamilyName();
|
||||
Collection<StoreFile> storeFiles = store.getStorefiles(); %>
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collections;
|
||||
|
@ -36,11 +38,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
|
|||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||
import org.apache.hadoop.hbase.regionserver.Leases;
|
||||
|
@ -51,14 +50,14 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
|||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
||||
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* Basic mock region server services. Should only be instantiated by HBaseTestingUtility.b
|
||||
*/
|
||||
|
@ -159,11 +158,6 @@ public class MockRegionServerServices implements RegionServerServices {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionRequestor getCompactionRequester() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterConnection getConnection() {
|
||||
return null;
|
||||
|
|
|
@ -124,7 +124,7 @@ public class TestIOFencing {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean compact(CompactionContext compaction, Store store,
|
||||
public boolean compact(CompactionContext compaction, HStore store,
|
||||
ThroughputController throughputController) throws IOException {
|
||||
try {
|
||||
return super.compact(compaction, store, throughputController);
|
||||
|
@ -134,7 +134,7 @@ public class TestIOFencing {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean compact(CompactionContext compaction, Store store,
|
||||
public boolean compact(CompactionContext compaction, HStore store,
|
||||
ThroughputController throughputController, User user) throws IOException {
|
||||
try {
|
||||
return super.compact(compaction, store, throughputController, user);
|
||||
|
|
|
@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
|
|||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -256,7 +256,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
|
|||
@Override
|
||||
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
|
||||
InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
|
||||
InternalScanner s, CompactionLifeCycleTracker request, long readPoint) throws IOException {
|
||||
return createCompactorScanner(store, scanners, scanType, earliestPutTs);
|
||||
}
|
||||
|
||||
|
|
|
@ -784,7 +784,7 @@ public class TestBlockEvictionFromClient {
|
|||
}
|
||||
|
||||
private BlockCache setCacheProperties(Region region) {
|
||||
Iterator<Store> strItr = region.getStores().iterator();
|
||||
Iterator<? extends Store> strItr = region.getStores().iterator();
|
||||
BlockCache cache = null;
|
||||
while (strItr.hasNext()) {
|
||||
Store store = strItr.next();
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
|
|||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -203,20 +204,20 @@ public class SimpleRegionObserver implements RegionObserver {
|
|||
|
||||
@Override
|
||||
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
List<StoreFile> candidates, CompactionRequest request) throws IOException {
|
||||
List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {
|
||||
ctPreCompactSelect.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
ImmutableList<StoreFile> selected, CompactionRequest request) {
|
||||
ImmutableList<StoreFile> selected, CompactionLifeCycleTracker tracker) {
|
||||
ctPostCompactSelect.incrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException {
|
||||
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
|
||||
throws IOException {
|
||||
ctPreCompact.incrementAndGet();
|
||||
return scanner;
|
||||
}
|
||||
|
@ -224,14 +225,14 @@ public class SimpleRegionObserver implements RegionObserver {
|
|||
@Override
|
||||
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
|
||||
InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
|
||||
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
|
||||
ctPreCompactScanner.incrementAndGet();
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
StoreFile resultFile, CompactionRequest request) throws IOException {
|
||||
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {
|
||||
ctPostCompact.incrementAndGet();
|
||||
}
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
|
|||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -194,13 +195,13 @@ public class TestCoprocessorInterface {
|
|||
}
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
Store store, InternalScanner scanner, ScanType scanType, CompactionRequest request) {
|
||||
Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) {
|
||||
preCompactCalled = true;
|
||||
return scanner;
|
||||
}
|
||||
@Override
|
||||
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
|
||||
Store store, StoreFile resultFile, CompactionRequest request) {
|
||||
Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker) {
|
||||
postCompactCalled = true;
|
||||
}
|
||||
@Override
|
||||
|
|
|
@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
|
|||
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
|
||||
|
@ -417,7 +417,7 @@ public class TestRegionObserverInterface {
|
|||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
|
||||
final InternalScanner scanner, final ScanType scanType, CompactionRequest request) {
|
||||
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) {
|
||||
return new InternalScanner() {
|
||||
@Override
|
||||
public boolean next(List<Cell> results) throws IOException {
|
||||
|
@ -456,7 +456,7 @@ public class TestRegionObserverInterface {
|
|||
|
||||
@Override
|
||||
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
|
||||
StoreFile resultFile, CompactionRequest request) {
|
||||
StoreFile resultFile, CompactionLifeCycleTracker tracker) {
|
||||
lastCompaction = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.filter.FilterBase;
|
|||
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
|
||||
|
@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
|||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
@ -155,7 +157,7 @@ public class TestRegionObserverScannerOpenHook {
|
|||
@Override
|
||||
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
|
||||
long earliestPutTs, InternalScanner s, CompactionRequest request, long readPoint)
|
||||
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint)
|
||||
throws IOException {
|
||||
scanners.forEach(KeyValueScanner::close);
|
||||
return NO_DATA;
|
||||
|
@ -252,7 +254,7 @@ public class TestRegionObserverScannerOpenHook {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean compact(CompactionContext compaction, Store store,
|
||||
public boolean compact(CompactionContext compaction, HStore store,
|
||||
ThroughputController throughputController) throws IOException {
|
||||
boolean ret = super.compact(compaction, store, throughputController);
|
||||
if (ret) compactionStateChangeLatch.countDown();
|
||||
|
@ -260,7 +262,7 @@ public class TestRegionObserverScannerOpenHook {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean compact(CompactionContext compaction, Store store,
|
||||
public boolean compact(CompactionContext compaction, HStore store,
|
||||
ThroughputController throughputController, User user) throws IOException {
|
||||
boolean ret = super.compact(compaction, store, throughputController, user);
|
||||
if (ret) compactionStateChangeLatch.countDown();
|
||||
|
|
|
@ -47,6 +47,27 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
|
|||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||
import org.apache.hadoop.hbase.regionserver.Leases;
|
||||
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
||||
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
|
||||
|
@ -61,10 +82,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
|
||||
|
@ -102,27 +123,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRespon
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
|
||||
import org.apache.hadoop.hbase.regionserver.Leases;
|
||||
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
||||
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* A mock RegionServer implementation.
|
||||
|
@ -314,12 +314,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionRequestor getCompactionRequester() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlushRequester getFlushRequester() {
|
||||
// TODO Auto-generated method stub
|
||||
|
|
|
@ -86,7 +86,7 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
|||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -716,14 +716,14 @@ public class TestMobCompactor {
|
|||
}
|
||||
|
||||
/**
|
||||
* This copro overwrites the default compaction policy. It always chooses two latest
|
||||
* hfiles and compacts them into a new one.
|
||||
* This copro overwrites the default compaction policy. It always chooses two latest hfiles and
|
||||
* compacts them into a new one.
|
||||
*/
|
||||
public static class CompactTwoLatestHfilesCopro implements RegionObserver {
|
||||
|
||||
@Override
|
||||
public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
|
||||
final Store store, final List<StoreFile> candidates, final CompactionRequest request)
|
||||
throws IOException {
|
||||
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {
|
||||
|
||||
int count = candidates.size();
|
||||
if (count >= 2) {
|
||||
|
|
|
@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
|||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -461,7 +462,7 @@ public class TestNamespaceAuditor {
|
|||
|
||||
@Override
|
||||
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
|
||||
StoreFile resultFile, CompactionRequest request) throws IOException {
|
||||
StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {
|
||||
postCompact.countDown();
|
||||
}
|
||||
|
||||
|
|
|
@ -36,9 +36,9 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -324,7 +324,7 @@ public class TestFileSystemUtilizationChore {
|
|||
final HRegionInfo info = mock(HRegionInfo.class);
|
||||
when(r.getRegionInfo()).thenReturn(info);
|
||||
List<Store> stores = new ArrayList<>();
|
||||
when(r.getStores()).thenReturn(stores);
|
||||
when(r.getStores()).thenReturn((List) stores);
|
||||
for (Long storeSize : storeSizes) {
|
||||
final Store s = mock(Store.class);
|
||||
stores.add(s);
|
||||
|
@ -338,7 +338,7 @@ public class TestFileSystemUtilizationChore {
|
|||
final HRegionInfo info = mock(HRegionInfo.class);
|
||||
when(r.getRegionInfo()).thenReturn(info);
|
||||
List<Store> stores = new ArrayList<>();
|
||||
when(r.getStores()).thenReturn(stores);
|
||||
when(r.getStores()).thenReturn((List) stores);
|
||||
assertEquals(
|
||||
"Logic error, storeSizes and linkSizes must be equal in size", storeSizes.size(),
|
||||
hfileSizes.size());
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor;
|
|||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
|
||||
/**
|
||||
|
@ -61,7 +62,7 @@ public class NoOpScanPolicyObserver implements RegionObserver {
|
|||
public InternalScanner preCompactScannerOpen(
|
||||
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
|
||||
InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
|
||||
InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
|
||||
// this demonstrates how to override the scanners default behavior
|
||||
ScanInfo oldSI = store.getScanInfo();
|
||||
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(),
|
||||
|
|
|
@ -18,11 +18,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.mockito.Matchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -33,15 +38,23 @@ import org.mockito.stubbing.Answer;
|
|||
*/
|
||||
public class StatefulStoreMockMaker {
|
||||
// Add and expand the methods and answers as needed.
|
||||
public CompactionContext selectCompaction() { return null; }
|
||||
public void cancelCompaction(Object originalContext) {}
|
||||
public int getPriority() { return 0; }
|
||||
public Optional<CompactionContext> selectCompaction() {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private class SelectAnswer implements Answer<CompactionContext> {
|
||||
public CompactionContext answer(InvocationOnMock invocation) throws Throwable {
|
||||
public void cancelCompaction(Object originalContext) {
|
||||
}
|
||||
|
||||
public int getPriority() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
private class SelectAnswer implements Answer<Optional<CompactionContext>> {
|
||||
public Optional<CompactionContext> answer(InvocationOnMock invocation) throws Throwable {
|
||||
return selectCompaction();
|
||||
}
|
||||
}
|
||||
|
||||
private class PriorityAnswer implements Answer<Integer> {
|
||||
public Integer answer(InvocationOnMock invocation) throws Throwable {
|
||||
return getPriority();
|
||||
|
@ -53,15 +66,13 @@ public class StatefulStoreMockMaker {
|
|||
}
|
||||
}
|
||||
|
||||
public Store createStoreMock(String name) throws Exception {
|
||||
Store store = mock(Store.class, name);
|
||||
when(store.requestCompaction(
|
||||
anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer());
|
||||
when(store.requestCompaction(
|
||||
anyInt(), isNull(CompactionRequest.class), any(User.class))).then(new SelectAnswer());
|
||||
public HStore createStoreMock(String name) throws Exception {
|
||||
HStore store = mock(HStore.class, name);
|
||||
when(store.requestCompaction(anyInt(), any(CompactionLifeCycleTracker.class), any(User.class)))
|
||||
.then(new SelectAnswer());
|
||||
when(store.getCompactPriority()).then(new PriorityAnswer());
|
||||
doAnswer(new CancelAnswer()).when(
|
||||
store).cancelRequestedCompaction(any(CompactionContext.class));
|
||||
doAnswer(new CancelAnswer()).when(store)
|
||||
.cancelRequestedCompaction(any(CompactionContext.class));
|
||||
return store;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,8 +33,8 @@ import static org.mockito.Mockito.when;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
||||
|
@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.security.User;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.After;
|
||||
|
@ -298,15 +298,16 @@ public class TestCompaction {
|
|||
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
|
||||
|
||||
// setup a region/store with some files
|
||||
Store store = r.getStore(COLUMN_FAMILY);
|
||||
HStore store = r.getStore(COLUMN_FAMILY);
|
||||
createStoreFile(r);
|
||||
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
|
||||
createStoreFile(r);
|
||||
}
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
|
||||
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request,null);
|
||||
Tracker tracker = new Tracker(latch);
|
||||
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, tracker,
|
||||
null);
|
||||
// wait for the latch to complete.
|
||||
latch.await();
|
||||
|
||||
|
@ -322,7 +323,7 @@ public class TestCompaction {
|
|||
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
|
||||
|
||||
// setup a region/store with some files
|
||||
Store store = r.getStore(COLUMN_FAMILY);
|
||||
HStore store = r.getStore(COLUMN_FAMILY);
|
||||
createStoreFile(r);
|
||||
for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
|
||||
createStoreFile(r);
|
||||
|
@ -337,9 +338,9 @@ public class TestCompaction {
|
|||
long preFailedCount = metricsWrapper.getNumCompactionsFailed();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
|
||||
Tracker tracker = new Tracker(latch);
|
||||
thread.requestCompaction(mockRegion, store, "test custom comapction", Store.PRIORITY_USER,
|
||||
request, null);
|
||||
tracker, null);
|
||||
// wait for the latch to complete.
|
||||
latch.await(120, TimeUnit.SECONDS);
|
||||
|
||||
|
@ -370,20 +371,17 @@ public class TestCompaction {
|
|||
|
||||
// setup a region/store with some files
|
||||
int numStores = r.getStores().size();
|
||||
List<Pair<CompactionRequest, Store>> requests = new ArrayList<>(numStores);
|
||||
CountDownLatch latch = new CountDownLatch(numStores);
|
||||
Tracker tracker = new Tracker(latch);
|
||||
// create some store files and setup requests for each store on which we want to do a
|
||||
// compaction
|
||||
for (Store store : r.getStores()) {
|
||||
for (HStore store : r.getStores()) {
|
||||
createStoreFile(r, store.getColumnFamilyName());
|
||||
createStoreFile(r, store.getColumnFamilyName());
|
||||
createStoreFile(r, store.getColumnFamilyName());
|
||||
requests.add(new Pair<>(new TrackableCompactionRequest(latch), store));
|
||||
thread.requestCompaction(r, store, "test mulitple custom comapctions", Store.PRIORITY_USER,
|
||||
tracker, null);
|
||||
}
|
||||
|
||||
thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
|
||||
Collections.unmodifiableList(requests), null);
|
||||
|
||||
// wait for the latch to complete.
|
||||
latch.await();
|
||||
|
||||
|
@ -428,7 +426,7 @@ public class TestCompaction {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized CompactionContext selectCompaction() {
|
||||
public synchronized Optional<CompactionContext> selectCompaction() {
|
||||
CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
|
||||
compacting.addAll(notCompacting);
|
||||
notCompacting.clear();
|
||||
|
@ -437,7 +435,7 @@ public class TestCompaction {
|
|||
} catch (IOException ex) {
|
||||
fail("Shouldn't happen");
|
||||
}
|
||||
return ctx;
|
||||
return Optional.of(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -499,14 +497,14 @@ public class TestCompaction {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompactionContext selectCompaction() {
|
||||
public Optional<CompactionContext> selectCompaction() {
|
||||
this.blocked = new BlockingCompactionContext();
|
||||
try {
|
||||
this.blocked.select(null, false, false, false);
|
||||
} catch (IOException ex) {
|
||||
fail("Shouldn't happen");
|
||||
}
|
||||
return this.blocked;
|
||||
return Optional.of(blocked);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -527,13 +525,13 @@ public class TestCompaction {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Store createStoreMock(String name) throws Exception {
|
||||
public HStore createStoreMock(String name) throws Exception {
|
||||
return createStoreMock(Integer.MIN_VALUE, name);
|
||||
}
|
||||
|
||||
public Store createStoreMock(int priority, String name) throws Exception {
|
||||
public HStore createStoreMock(int priority, String name) throws Exception {
|
||||
// Override the mock to always return the specified priority.
|
||||
Store s = super.createStoreMock(name);
|
||||
HStore s = super.createStoreMock(name);
|
||||
when(s.getCompactPriority()).thenReturn(priority);
|
||||
return s;
|
||||
}
|
||||
|
@ -555,7 +553,7 @@ public class TestCompaction {
|
|||
// Set up the region mock that redirects compactions.
|
||||
HRegion r = mock(HRegion.class);
|
||||
when(
|
||||
r.compact(any(CompactionContext.class), any(Store.class),
|
||||
r.compact(any(CompactionContext.class), any(HStore.class),
|
||||
any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
|
||||
@Override
|
||||
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
||||
|
@ -568,7 +566,7 @@ public class TestCompaction {
|
|||
// Set up store mocks for 2 "real" stores and the one we use for blocking CST.
|
||||
ArrayList<Integer> results = new ArrayList<>();
|
||||
StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
|
||||
Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
|
||||
HStore store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
|
||||
BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
|
||||
|
||||
// First, block the compaction thread so that we could muck with queue.
|
||||
|
@ -691,24 +689,20 @@ public class TestCompaction {
|
|||
}
|
||||
|
||||
/**
|
||||
* Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.
|
||||
* Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
|
||||
* finishes.
|
||||
*/
|
||||
public static class TrackableCompactionRequest extends CompactionRequest {
|
||||
private CountDownLatch done;
|
||||
public static class Tracker implements CompactionLifeCycleTracker {
|
||||
|
||||
/**
|
||||
* Constructor for a custom compaction. Uses the setXXX methods to update the state of the
|
||||
* compaction before being used.
|
||||
*/
|
||||
public TrackableCompactionRequest(CountDownLatch finished) {
|
||||
super();
|
||||
this.done = finished;
|
||||
private final CountDownLatch done;
|
||||
|
||||
public Tracker(CountDownLatch done) {
|
||||
this.done = done;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterExecute() {
|
||||
super.afterExecute();
|
||||
this.done.countDown();
|
||||
public void afterExecute(Store store) {
|
||||
done.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Date;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
|
@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
|
|||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
@ -536,8 +538,9 @@ public class TestHMobStore {
|
|||
|
||||
// Trigger major compaction
|
||||
this.store.triggerMajorCompaction();
|
||||
CompactionContext requestCompaction = this.store.requestCompaction(1, null);
|
||||
this.store.compact(requestCompaction, NoLimitThroughputController.INSTANCE, null);
|
||||
Optional<CompactionContext> requestCompaction =
|
||||
this.store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
|
||||
this.store.compact(requestCompaction.get(), NoLimitThroughputController.INSTANCE, null);
|
||||
Assert.assertEquals(1, this.store.getStorefiles().size());
|
||||
|
||||
//Check encryption after compaction
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
|
@ -254,7 +255,8 @@ public class TestHRegionServerBulkLoad {
|
|||
static int sleepDuration;
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
|
||||
InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException {
|
||||
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
|
||||
throws IOException {
|
||||
try {
|
||||
Thread.sleep(sleepDuration);
|
||||
} catch (InterruptedException ie) {
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
|
||||
|
@ -417,7 +418,7 @@ public class TestMajorCompaction {
|
|||
}
|
||||
store.triggerMajorCompaction();
|
||||
|
||||
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
|
||||
CompactionRequest request = store.requestCompaction().get().getRequest();
|
||||
assertNotNull("Expected to receive a compaction request", request);
|
||||
assertEquals(
|
||||
"System-requested major compaction should not occur if there are too many store files",
|
||||
|
@ -436,7 +437,9 @@ public class TestMajorCompaction {
|
|||
createStoreFile(r);
|
||||
}
|
||||
store.triggerMajorCompaction();
|
||||
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
|
||||
CompactionRequest request =
|
||||
store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null).get()
|
||||
.getRequest();
|
||||
assertNotNull("Expected to receive a compaction request", request);
|
||||
assertEquals(
|
||||
"User-requested major compaction should always occur, even if there are too many store files",
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.io.InterruptedIOException;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -70,7 +71,11 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
|||
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
|
||||
import org.apache.hadoop.hbase.master.*;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
||||
import org.apache.hadoop.hbase.master.NoSuchProcedureException;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||
|
@ -234,7 +239,7 @@ public class TestSplitTransactionOnCluster {
|
|||
assertEquals(1, cluster.getRegions(tableName).size());
|
||||
|
||||
HRegion region = cluster.getRegions(tableName).get(0);
|
||||
Store store = region.getStore(cf);
|
||||
HStore store = region.getStore(cf);
|
||||
int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
|
||||
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
|
||||
|
||||
|
@ -246,8 +251,8 @@ public class TestSplitTransactionOnCluster {
|
|||
int fileNum = store.getStorefiles().size();
|
||||
// 0, Compaction Request
|
||||
store.triggerMajorCompaction();
|
||||
CompactionContext cc = store.requestCompaction();
|
||||
assertNotNull(cc);
|
||||
Optional<CompactionContext> cc = store.requestCompaction();
|
||||
assertTrue(cc.isPresent());
|
||||
// 1, A timeout split
|
||||
// 1.1 close region
|
||||
assertEquals(2, region.close(false).get(cf).size());
|
||||
|
@ -255,7 +260,7 @@ public class TestSplitTransactionOnCluster {
|
|||
region.initialize();
|
||||
|
||||
// 2, Run Compaction cc
|
||||
assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE));
|
||||
assertFalse(region.compact(cc.get(), store, NoLimitThroughputController.INSTANCE));
|
||||
assertTrue(fileNum > store.getStorefiles().size());
|
||||
|
||||
// 3, Split
|
||||
|
|
|
@ -117,7 +117,7 @@ public class TestSplitWalDataLoss {
|
|||
}
|
||||
}).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
|
||||
Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
|
||||
Matchers.<Collection<Store>> any());
|
||||
Matchers.<Collection<HStore>> any());
|
||||
// Find region key; don't pick up key for hbase:meta by mistake.
|
||||
String key = null;
|
||||
for (Map.Entry<String, Region> entry: rs.onlineRegions.entrySet()) {
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
@ -47,7 +46,6 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -74,6 +72,8 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
|||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -105,12 +106,6 @@ import org.junit.experimental.categories.Category;
|
|||
import org.junit.rules.TestName;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test class for the Store
|
||||
*/
|
||||
|
@ -371,7 +366,7 @@ public class TestStore {
|
|||
// There will be no compaction due to threshold above. Last file will not be replaced.
|
||||
for (int i = 1; i <= storeFileNum - 1; i++) {
|
||||
// verify the expired store file.
|
||||
assertNull(this.store.requestCompaction());
|
||||
assertFalse(this.store.requestCompaction().isPresent());
|
||||
Collection<StoreFile> sfs = this.store.getStorefiles();
|
||||
// Ensure i files are gone.
|
||||
if (minVersions == 0) {
|
||||
|
@ -386,7 +381,7 @@ public class TestStore {
|
|||
// Let the next store file expired.
|
||||
edge.incrementTime(sleepTime);
|
||||
}
|
||||
assertNull(this.store.requestCompaction());
|
||||
assertFalse(this.store.requestCompaction().isPresent());
|
||||
|
||||
Collection<StoreFile> sfs = this.store.getStorefiles();
|
||||
// Assert the last expired file is not removed.
|
||||
|
@ -422,7 +417,7 @@ public class TestStore {
|
|||
Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
|
||||
|
||||
// after compact; check the lowest time stamp
|
||||
store.compact(store.requestCompaction(), NoLimitThroughputController.INSTANCE, null);
|
||||
store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
|
||||
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
|
||||
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
|
||||
Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
|
||||
|
|
|
@ -823,12 +823,12 @@ public abstract class AbstractTestWALReplay {
|
|||
final HRegion region =
|
||||
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
|
||||
@Override
|
||||
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
|
||||
final Collection<Store> storesToFlush, MonitoredTask status,
|
||||
protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
|
||||
final Collection<HStore> storesToFlush, MonitoredTask status,
|
||||
boolean writeFlushWalMarker)
|
||||
throws IOException {
|
||||
LOG.info("InternalFlushCache Invoked");
|
||||
FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
|
||||
FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
|
||||
Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
|
||||
flushcount.incrementAndGet();
|
||||
return fs;
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
|||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -263,7 +264,7 @@ public class TestCoprocessorScanPolicy {
|
|||
public InternalScanner preCompactScannerOpen(
|
||||
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
|
||||
InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
|
||||
InternalScanner s,CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
|
||||
Long newTtl = ttls.get(store.getTableName());
|
||||
Integer newVersions = versions.get(store.getTableName());
|
||||
ScanInfo oldSI = store.getScanInfo();
|
||||
|
|
Loading…
Reference in New Issue