HBASE-18989 Polish the compaction related CP hooks
This commit is contained in:
parent
fad75f07aa
commit
4c43ef2683
|
@ -158,7 +158,7 @@ public interface RegionObserver {
|
||||||
/**
|
/**
|
||||||
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
|
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
|
||||||
* available candidates. To alter the files used for compaction, you may mutate the passed in list
|
* available candidates. To alter the files used for compaction, you may mutate the passed in list
|
||||||
* of candidates.
|
* of candidates. If you remove all the candidates then the compaction will be canceled.
|
||||||
* @param c the environment provided by the region server
|
* @param c the environment provided by the region server
|
||||||
* @param store the store where compaction is being requested
|
* @param store the store where compaction is being requested
|
||||||
* @param candidates the store files currently available for compaction
|
* @param candidates the store files currently available for compaction
|
||||||
|
@ -183,18 +183,12 @@ public interface RegionObserver {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
|
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
|
||||||
* {@code StoreFile}. To override or modify the compaction process, implementing classes have two
|
* {@code StoreFile}.
|
||||||
* options:
|
* <p>
|
||||||
* <ul>
|
* To override or modify the compaction process, implementing classes can wrap the provided
|
||||||
* <li>Wrap the provided {@link InternalScanner} with a custom implementation that is returned
|
* {@link InternalScanner} with a custom implementation that is returned from this method. The
|
||||||
* from this method. The custom scanner can then inspect
|
* custom scanner can then inspect {@link org.apache.hadoop.hbase.Cell}s from the wrapped scanner,
|
||||||
* {@link org.apache.hadoop.hbase.KeyValue}s from the wrapped scanner, applying its own
|
* applying its own policy to what gets written.
|
||||||
* policy to what gets written.</li>
|
|
||||||
* <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a
|
|
||||||
* custom implementation for writing of new {@link StoreFile}s. <strong>Note: any implementations
|
|
||||||
* bypassing core compaction using this approach must write out new store files themselves or the
|
|
||||||
* existing data will no longer be available after compaction.</strong></li>
|
|
||||||
* </ul>
|
|
||||||
* @param c the environment provided by the region server
|
* @param c the environment provided by the region server
|
||||||
* @param store the store being compacted
|
* @param store the store being compacted
|
||||||
* @param scanner the scanner over existing data used in the store file rewriting
|
* @param scanner the scanner over existing data used in the store file rewriting
|
||||||
|
@ -206,8 +200,7 @@ public interface RegionObserver {
|
||||||
*/
|
*/
|
||||||
default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
|
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
|
||||||
CompactionRequest request)
|
CompactionRequest request) throws IOException {
|
||||||
throws IOException {
|
|
||||||
return scanner;
|
return scanner;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,8 @@ import java.util.concurrent.RejectedExecutionHandler;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.IntSupplier;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
@ -60,7 +63,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||||
* Compact region on request and then run split if appropriate
|
* Compact region on request and then run split if appropriate
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class CompactSplit implements PropagatingConfigurationObserver {
|
public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
|
||||||
private static final Log LOG = LogFactory.getLog(CompactSplit.class);
|
private static final Log LOG = LogFactory.getLog(CompactSplit.class);
|
||||||
|
|
||||||
// Configuration key for the large compaction threads.
|
// Configuration key for the large compaction threads.
|
||||||
|
@ -99,7 +102,6 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
|
|
||||||
/** @param server */
|
/** @param server */
|
||||||
CompactSplit(HRegionServer server) {
|
CompactSplit(HRegionServer server) {
|
||||||
super();
|
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.conf = server.getConfiguration();
|
this.conf = server.getConfiguration();
|
||||||
this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
|
this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
|
||||||
|
@ -235,14 +237,68 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void requestCompaction(HRegion region, String why, int priority,
|
// A compaction life cycle tracker to trace the execution of all the compactions triggered by one
|
||||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
// request and delegate to the source CompactionLifeCycleTracker. It will call completed method if
|
||||||
requestCompactionInternal(region, why, priority, true, tracker, user);
|
// all the compactions are finished.
|
||||||
|
private static final class AggregatingCompactionLifeCycleTracker
|
||||||
|
implements CompactionLifeCycleTracker {
|
||||||
|
|
||||||
|
private final CompactionLifeCycleTracker tracker;
|
||||||
|
|
||||||
|
private final AtomicInteger remaining;
|
||||||
|
|
||||||
|
public AggregatingCompactionLifeCycleTracker(CompactionLifeCycleTracker tracker,
|
||||||
|
int numberOfStores) {
|
||||||
|
this.tracker = tracker;
|
||||||
|
this.remaining = new AtomicInteger(numberOfStores);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tryCompleted() {
|
||||||
|
if (remaining.decrementAndGet() == 0) {
|
||||||
|
tracker.completed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void notExecuted(Store store, String reason) {
|
||||||
|
tracker.notExecuted(store, reason);
|
||||||
|
tryCompleted();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeExecution(Store store) {
|
||||||
|
tracker.beforeExecution(store);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterExecution(Store store) {
|
||||||
|
tracker.afterExecution(store);
|
||||||
|
tryCompleted();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private CompactionLifeCycleTracker wrap(CompactionLifeCycleTracker tracker,
|
||||||
|
IntSupplier numberOfStores) {
|
||||||
|
if (tracker == CompactionLifeCycleTracker.DUMMY) {
|
||||||
|
// a simple optimization to avoid creating unnecessary objects as usually we do not care about
|
||||||
|
// the life cycle of a compaction.
|
||||||
|
return tracker;
|
||||||
|
} else {
|
||||||
|
return new AggregatingCompactionLifeCycleTracker(tracker, numberOfStores.getAsInt());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void requestCompaction(HRegion region, String why, int priority,
|
||||||
|
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||||
|
requestCompactionInternal(region, why, priority, true,
|
||||||
|
wrap(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
|
public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
|
||||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||||
requestCompactionInternal(region, store, why, priority, true, tracker, user);
|
requestCompactionInternal(region, store, why, priority, true, wrap(tracker, () -> 1), user);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void requestCompactionInternal(HRegion region, String why, int priority,
|
private void requestCompactionInternal(HRegion region, String why, int priority,
|
||||||
|
@ -259,6 +315,17 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
!region.getTableDescriptor().isCompactionEnabled())) {
|
!region.getTableDescriptor().isCompactionEnabled())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
RegionServerSpaceQuotaManager spaceQuotaManager =
|
||||||
|
this.server.getRegionServerSpaceQuotaManager();
|
||||||
|
if (spaceQuotaManager != null &&
|
||||||
|
spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
|
||||||
|
String reason = "Ignoring compaction request for " + region +
|
||||||
|
" as an active space quota violation " + " policy disallows compactions.";
|
||||||
|
tracker.notExecuted(store, reason);
|
||||||
|
LOG.debug(reason);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Optional<CompactionContext> compaction;
|
Optional<CompactionContext> compaction;
|
||||||
if (selectNow) {
|
if (selectNow) {
|
||||||
compaction = selectCompaction(region, store, priority, tracker, user);
|
compaction = selectCompaction(region, store, priority, tracker, user);
|
||||||
|
@ -270,17 +337,6 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
compaction = Optional.empty();
|
compaction = Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
RegionServerSpaceQuotaManager spaceQuotaManager =
|
|
||||||
this.server.getRegionServerSpaceQuotaManager();
|
|
||||||
if (spaceQuotaManager != null &&
|
|
||||||
spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Ignoring compaction request for " + region +
|
|
||||||
" as an active space quota violation " + " policy disallows compactions.");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadPoolExecutor pool;
|
ThreadPoolExecutor pool;
|
||||||
if (selectNow) {
|
if (selectNow) {
|
||||||
// compaction.get is safe as we will just return if selectNow is true but no compaction is
|
// compaction.get is safe as we will just return if selectNow is true but no compaction is
|
||||||
|
@ -315,9 +371,11 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
|
private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
|
||||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
||||||
Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
|
Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
|
||||||
if (!compaction.isPresent() && LOG.isDebugEnabled() && region.getRegionInfo() != null) {
|
if (!compaction.isPresent() && region.getRegionInfo() != null) {
|
||||||
LOG.debug("Not compacting " + region.getRegionInfo().getRegionNameAsString() +
|
String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
|
||||||
" because compaction request was cancelled");
|
" because compaction request was cancelled";
|
||||||
|
tracker.notExecuted(store, reason);
|
||||||
|
LOG.debug(reason);
|
||||||
}
|
}
|
||||||
return compaction;
|
return compaction;
|
||||||
}
|
}
|
||||||
|
@ -454,7 +512,6 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
|
|
||||||
public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction,
|
public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction,
|
||||||
ThreadPoolExecutor parent, User user) {
|
ThreadPoolExecutor parent, User user) {
|
||||||
super();
|
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.region = region;
|
this.region = region;
|
||||||
this.compaction = compaction;
|
this.compaction = compaction;
|
||||||
|
@ -462,7 +519,7 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
: store.getCompactPriority();
|
: store.getCompactPriority();
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
this.user = user;
|
this.user = user;
|
||||||
this.time = System.currentTimeMillis();
|
this.time = EnvironmentEdgeManager.currentTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -520,7 +577,7 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
// Finally we can compact something.
|
// Finally we can compact something.
|
||||||
assert c != null;
|
assert c != null;
|
||||||
|
|
||||||
c.getRequest().getTracker().beforeExecute(store);
|
c.getRequest().getTracker().beforeExecution(store);
|
||||||
try {
|
try {
|
||||||
// Note: please don't put single-compaction logic here;
|
// Note: please don't put single-compaction logic here;
|
||||||
// put it into region/store/etc. This is CST logic.
|
// put it into region/store/etc. This is CST logic.
|
||||||
|
@ -553,7 +610,7 @@ public class CompactSplit implements PropagatingConfigurationObserver {
|
||||||
region.reportCompactionRequestFailure();
|
region.reportCompactionRequestFailure();
|
||||||
server.checkFileSystem();
|
server.checkFileSystem();
|
||||||
} finally {
|
} finally {
|
||||||
c.getRequest().getTracker().afterExecute(store);
|
c.getRequest().getTracker().afterExecution(store);
|
||||||
region.decrementCompactionsQueuedCount();
|
region.decrementCompactionsQueuedCount();
|
||||||
LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
|
LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1952,11 +1952,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
protected void doRegionCompactionPrep() throws IOException {
|
protected void doRegionCompactionPrep() throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void triggerMajorCompaction() throws IOException {
|
|
||||||
stores.values().forEach(HStore::triggerMajorCompaction);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Synchronously compact all stores in the region.
|
* Synchronously compact all stores in the region.
|
||||||
* <p>This operation could block for a long time, so don't call it from a
|
* <p>This operation could block for a long time, so don't call it from a
|
||||||
|
@ -1972,7 +1967,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
*/
|
*/
|
||||||
public void compact(boolean majorCompaction) throws IOException {
|
public void compact(boolean majorCompaction) throws IOException {
|
||||||
if (majorCompaction) {
|
if (majorCompaction) {
|
||||||
triggerMajorCompaction();
|
stores.values().forEach(HStore::triggerMajorCompaction);
|
||||||
}
|
}
|
||||||
for (HStore s : stores.values()) {
|
for (HStore s : stores.values()) {
|
||||||
Optional<CompactionContext> compaction = s.requestCompaction();
|
Optional<CompactionContext> compaction = s.requestCompaction();
|
||||||
|
@ -8212,16 +8207,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker,
|
public void requestCompaction(String why, int priority, boolean major,
|
||||||
User user) throws IOException {
|
CompactionLifeCycleTracker tracker) throws IOException {
|
||||||
((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, why, priority, tracker,
|
if (major) {
|
||||||
user);
|
stores.values().forEach(HStore::triggerMajorCompaction);
|
||||||
|
}
|
||||||
|
rsServices.getCompactionRequestor().requestCompaction(this, why, priority, tracker,
|
||||||
|
RpcServer.getRequestUser().orElse(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void requestCompaction(byte[] family, String why, int priority,
|
public void requestCompaction(byte[] family, String why, int priority, boolean major,
|
||||||
CompactionLifeCycleTracker tracker, User user) throws IOException {
|
CompactionLifeCycleTracker tracker) throws IOException {
|
||||||
((HRegionServer) rsServices).compactSplitThread.requestCompaction(this,
|
HStore store = stores.get(family);
|
||||||
Preconditions.checkNotNull(stores.get(family)), why, priority, tracker, user);
|
if (store == null) {
|
||||||
|
throw new NoSuchColumnFamilyException("column family " + Bytes.toString(family) +
|
||||||
|
" does not exist in region " + getRegionInfo().getRegionNameAsString());
|
||||||
|
}
|
||||||
|
if (major) {
|
||||||
|
store.triggerMajorCompaction();
|
||||||
|
}
|
||||||
|
rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker,
|
||||||
|
RpcServer.getRequestUser().orElse(null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
|
||||||
|
@ -1686,9 +1687,9 @@ public class HRegionServer extends HasThread implements
|
||||||
int totalStaticBloomSizeKB = 0;
|
int totalStaticBloomSizeKB = 0;
|
||||||
long totalCompactingKVs = 0;
|
long totalCompactingKVs = 0;
|
||||||
long currentCompactedKVs = 0;
|
long currentCompactedKVs = 0;
|
||||||
List<? extends Store> storeList = r.getStores();
|
List<HStore> storeList = r.getStores();
|
||||||
stores += storeList.size();
|
stores += storeList.size();
|
||||||
for (Store store : storeList) {
|
for (HStore store : storeList) {
|
||||||
storefiles += store.getStorefilesCount();
|
storefiles += store.getStorefilesCount();
|
||||||
storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
|
storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
|
||||||
storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
|
storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
|
||||||
|
@ -2779,6 +2780,11 @@ public class HRegionServer extends HasThread implements
|
||||||
return this.cacheFlusher;
|
return this.cacheFlusher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompactionRequester getCompactionRequestor() {
|
||||||
|
return this.compactSplitThread;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the top N most loaded regions this server is serving so we can tell the
|
* Get the top N most loaded regions this server is serving so we can tell the
|
||||||
* master which regions it can reallocate if we're overloaded. TODO: actually
|
* master which regions it can reallocate if we're overloaded. TODO: actually
|
||||||
|
|
|
@ -1625,7 +1625,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
|
return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
|
* getter for CompactionProgress object
|
||||||
|
* @return CompactionProgress object; can be null
|
||||||
|
*/
|
||||||
public CompactionProgress getCompactionProgress() {
|
public CompactionProgress getCompactionProgress() {
|
||||||
return this.storeEngine.getCompactor().getProgress();
|
return this.storeEngine.getCompactor().getProgress();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1533,7 +1533,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
* @throws ServiceException
|
* @throws ServiceException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@QosPriority(priority=HConstants.ADMIN_QOS)
|
@QosPriority(priority = HConstants.ADMIN_QOS)
|
||||||
public CompactRegionResponse compactRegion(final RpcController controller,
|
public CompactRegionResponse compactRegion(final RpcController controller,
|
||||||
final CompactRegionRequest request) throws ServiceException {
|
final CompactRegionRequest request) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
|
@ -1551,41 +1551,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
region.startRegionOperation(Operation.COMPACT_REGION);
|
region.startRegionOperation(Operation.COMPACT_REGION);
|
||||||
LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
|
LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
|
||||||
boolean major = false;
|
boolean major = request.hasMajor() && request.getMajor();
|
||||||
byte [] family = null;
|
|
||||||
HStore store = null;
|
|
||||||
if (request.hasFamily()) {
|
if (request.hasFamily()) {
|
||||||
family = request.getFamily().toByteArray();
|
byte[] family = request.getFamily().toByteArray();
|
||||||
store = region.getStore(family);
|
String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
|
||||||
if (store == null) {
|
region.getRegionInfo().getRegionNameAsString() + " and family " +
|
||||||
throw new ServiceException(new DoNotRetryIOException("column family " +
|
Bytes.toString(family);
|
||||||
Bytes.toString(family) + " does not exist in region " +
|
LOG.trace(log);
|
||||||
region.getRegionInfo().getRegionNameAsString()));
|
region.requestCompaction(family, log, Store.PRIORITY_USER, major,
|
||||||
}
|
CompactionLifeCycleTracker.DUMMY);
|
||||||
}
|
|
||||||
if (request.hasMajor()) {
|
|
||||||
major = request.getMajor();
|
|
||||||
}
|
|
||||||
if (major) {
|
|
||||||
if (family != null) {
|
|
||||||
store.triggerMajorCompaction();
|
|
||||||
} else {
|
|
||||||
region.triggerMajorCompaction();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("User-triggered compaction requested for region "
|
|
||||||
+ region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
|
|
||||||
}
|
|
||||||
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
|
|
||||||
if (family != null) {
|
|
||||||
regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER,
|
|
||||||
CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser().orElse(null));
|
|
||||||
} else {
|
} else {
|
||||||
regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER,
|
String log = "User-triggered " + (major ? "major " : "") + "compaction for region " +
|
||||||
CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser().orElse(null));
|
region.getRegionInfo().getRegionNameAsString();
|
||||||
|
LOG.trace(log);
|
||||||
|
region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY);
|
||||||
}
|
}
|
||||||
return CompactRegionResponse.newBuilder().build();
|
return CompactRegionResponse.newBuilder().build();
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.CompactionState;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
@ -43,7 +42,6 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
|
||||||
|
@ -454,15 +452,6 @@ public interface Region extends ConfigurationObserver {
|
||||||
// Flushes, compactions, splits, etc.
|
// Flushes, compactions, splits, etc.
|
||||||
// Wizards only, please
|
// Wizards only, please
|
||||||
|
|
||||||
/**
|
|
||||||
* Trigger major compaction on all stores in the region.
|
|
||||||
* <p>
|
|
||||||
* Compaction will be performed asynchronously to this call by the RegionServer's
|
|
||||||
* CompactSplitThread.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void triggerMajorCompaction() throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return if a given region is in compaction now.
|
* @return if a given region is in compaction now.
|
||||||
*/
|
*/
|
||||||
|
@ -471,12 +460,12 @@ public interface Region extends ConfigurationObserver {
|
||||||
/**
|
/**
|
||||||
* Request compaction on this region.
|
* Request compaction on this region.
|
||||||
*/
|
*/
|
||||||
void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker, User user)
|
void requestCompaction(String why, int priority, boolean major,
|
||||||
throws IOException;
|
CompactionLifeCycleTracker tracker) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Request compaction for the given family
|
* Request compaction for the given family
|
||||||
*/
|
*/
|
||||||
void requestCompaction(byte[] family, String why, int priority,
|
void requestCompaction(byte[] family, String why, int priority, boolean major,
|
||||||
CompactionLifeCycleTracker tracker, User user) throws IOException;
|
CompactionLifeCycleTracker tracker) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||||
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
|
||||||
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -59,10 +60,17 @@ public interface RegionServerServices extends Server, OnlineRegions, FavoredNode
|
||||||
List<WAL> getWALs() throws IOException;
|
List<WAL> getWALs() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Implementation of {@link FlushRequester} or null.
|
* @return Implementation of {@link FlushRequester} or null. Usually it will not be null unless
|
||||||
|
* during intialization.
|
||||||
*/
|
*/
|
||||||
FlushRequester getFlushRequester();
|
FlushRequester getFlushRequester();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Implementation of {@link CompactionRequester} or null. Usually it will not be null
|
||||||
|
* unless during intialization.
|
||||||
|
*/
|
||||||
|
CompactionRequester getCompactionRequestor();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the RegionServerAccounting for this Region Server
|
* @return the RegionServerAccounting for this Region Server
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
|
||||||
|
@ -61,12 +60,6 @@ public interface Store {
|
||||||
|
|
||||||
FileSystem getFileSystem();
|
FileSystem getFileSystem();
|
||||||
|
|
||||||
/**
|
|
||||||
* getter for CompactionProgress object
|
|
||||||
* @return CompactionProgress object; can be null
|
|
||||||
*/
|
|
||||||
CompactionProgress getCompactionProgress();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests whether we should run a major compaction. For example, if the configured major compaction
|
* Tests whether we should run a major compaction. For example, if the configured major compaction
|
||||||
* interval is reached.
|
* interval is reached.
|
||||||
|
|
|
@ -32,13 +32,19 @@ public interface CompactionLifeCycleTracker {
|
||||||
static CompactionLifeCycleTracker DUMMY = new CompactionLifeCycleTracker() {
|
static CompactionLifeCycleTracker DUMMY = new CompactionLifeCycleTracker() {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called if the compaction request is failed for some reason.
|
||||||
|
*/
|
||||||
|
default void notExecuted(Store store, String reason) {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called before compaction is executed by CompactSplitThread.
|
* Called before compaction is executed by CompactSplitThread.
|
||||||
* <p>
|
* <p>
|
||||||
* Requesting compaction on a region can lead to multiple compactions on different stores, so we
|
* Requesting compaction on a region can lead to multiple compactions on different stores, so we
|
||||||
* will pass the {@link Store} in to tell you the store we operate on.
|
* will pass the {@link Store} in to tell you the store we operate on.
|
||||||
*/
|
*/
|
||||||
default void beforeExecute(Store store) {
|
default void beforeExecution(Store store) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -47,6 +53,15 @@ public interface CompactionLifeCycleTracker {
|
||||||
* Requesting compaction on a region can lead to multiple compactions on different stores, so we
|
* Requesting compaction on a region can lead to multiple compactions on different stores, so we
|
||||||
* will pass the {@link Store} in to tell you the store we operate on.
|
* will pass the {@link Store} in to tell you the store we operate on.
|
||||||
*/
|
*/
|
||||||
default void afterExecute(Store store) {
|
default void afterExecution(Store store) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after all the requested compactions are completed.
|
||||||
|
* <p>
|
||||||
|
* The compaction scheduling is per Store so if you request a compaction on a region it may lead
|
||||||
|
* to multiple compactions.
|
||||||
|
*/
|
||||||
|
default void completed() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request a compaction.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public interface CompactionRequester {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request compaction on all the stores of the given region.
|
||||||
|
*/
|
||||||
|
void requestCompaction(HRegion region, String why, int priority,
|
||||||
|
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request compaction on the given store.
|
||||||
|
*/
|
||||||
|
void requestCompaction(HRegion region, HStore store, String why, int priority,
|
||||||
|
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
|
||||||
|
}
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
@ -150,6 +151,11 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompactionRequester getCompactionRequestor() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterConnection getConnection() {
|
public ClusterConnection getConnection() {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
@ -219,7 +220,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isStopped() {
|
public boolean isStopped() {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,18 +264,15 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addRegion(HRegion r) {
|
public void addRegion(HRegion r) {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean removeRegion(HRegion r, ServerName destination) {
|
public boolean removeRegion(HRegion r, ServerName destination) {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HRegion getRegion(String encodedRegionName) {
|
public HRegion getRegion(String encodedRegionName) {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,13 +313,14 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FlushRequester getFlushRequester() {
|
public FlushRequester getFlushRequester() {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
public CompactionRequester getCompactionRequestor() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public RegionServerAccounting getRegionServerAccounting() {
|
public RegionServerAccounting getRegionServerAccounting() {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,24 +332,20 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
@Override
|
@Override
|
||||||
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
|
public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException,
|
||||||
IOException {
|
IOException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RpcServerInterface getRpcServer() {
|
public RpcServerInterface getRpcServer() {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
|
public ConcurrentSkipListMap<byte[], Boolean> getRegionsInTransitionInRS() {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FileSystem getFileSystem() {
|
public FileSystem getFileSystem() {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,7 +365,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
@Override
|
@Override
|
||||||
public MutateResponse mutate(RpcController controller, MutateRequest request)
|
public MutateResponse mutate(RpcController controller, MutateRequest request)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,7 +403,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
@Override
|
@Override
|
||||||
public BulkLoadHFileResponse bulkLoadHFile(RpcController controller,
|
public BulkLoadHFileResponse bulkLoadHFile(RpcController controller,
|
||||||
BulkLoadHFileRequest request) throws ServiceException {
|
BulkLoadHFileRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,7 +415,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
@Override
|
@Override
|
||||||
public ClientProtos.MultiResponse multi(
|
public ClientProtos.MultiResponse multi(
|
||||||
RpcController controller, MultiRequest request) throws ServiceException {
|
RpcController controller, MultiRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -451,14 +442,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
@Override
|
@Override
|
||||||
public GetStoreFileResponse getStoreFile(RpcController controller,
|
public GetStoreFileResponse getStoreFile(RpcController controller,
|
||||||
GetStoreFileRequest request) throws ServiceException {
|
GetStoreFileRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
|
public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
|
||||||
GetOnlineRegionRequest request) throws ServiceException {
|
GetOnlineRegionRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,74 +459,63 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
@Override
|
@Override
|
||||||
public OpenRegionResponse openRegion(RpcController controller,
|
public OpenRegionResponse openRegion(RpcController controller,
|
||||||
OpenRegionRequest request) throws ServiceException {
|
OpenRegionRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WarmupRegionResponse warmupRegion(RpcController controller,
|
public WarmupRegionResponse warmupRegion(RpcController controller,
|
||||||
WarmupRegionRequest request) throws ServiceException {
|
WarmupRegionRequest request) throws ServiceException {
|
||||||
//TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public CloseRegionResponse closeRegion(RpcController controller,
|
public CloseRegionResponse closeRegion(RpcController controller,
|
||||||
CloseRegionRequest request) throws ServiceException {
|
CloseRegionRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FlushRegionResponse flushRegion(RpcController controller,
|
public FlushRegionResponse flushRegion(RpcController controller,
|
||||||
FlushRegionRequest request) throws ServiceException {
|
FlushRegionRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompactRegionResponse compactRegion(RpcController controller,
|
public CompactRegionResponse compactRegion(RpcController controller,
|
||||||
CompactRegionRequest request) throws ServiceException {
|
CompactRegionRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
|
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
|
||||||
ReplicateWALEntryRequest request) throws ServiceException {
|
ReplicateWALEntryRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RollWALWriterResponse rollWALWriter(RpcController controller,
|
public RollWALWriterResponse rollWALWriter(RpcController controller,
|
||||||
RollWALWriterRequest request) throws ServiceException {
|
RollWALWriterRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetServerInfoResponse getServerInfo(RpcController controller,
|
public GetServerInfoResponse getServerInfo(RpcController controller,
|
||||||
GetServerInfoRequest request) throws ServiceException {
|
GetServerInfoRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StopServerResponse stopServer(RpcController controller,
|
public StopServerResponse stopServer(RpcController controller,
|
||||||
StopServerRequest request) throws ServiceException {
|
StopServerRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Region> getRegions(TableName tableName) throws IOException {
|
public List<Region> getRegions(TableName tableName) throws IOException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Leases getLeases() {
|
public Leases getLeases() {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -575,13 +553,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
public ReplicateWALEntryResponse
|
public ReplicateWALEntryResponse
|
||||||
replay(RpcController controller, ReplicateWALEntryRequest request)
|
replay(RpcController controller, ReplicateWALEntryRequest request)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, HRegion> getRecoveringRegions() {
|
public Map<String, HRegion> getRecoveringRegions() {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -603,14 +579,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean registerService(com.google.protobuf.Service service) {
|
public boolean registerService(com.google.protobuf.Service service) {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CoprocessorServiceResponse execRegionServerService(RpcController controller,
|
public CoprocessorServiceResponse execRegionServerService(RpcController controller,
|
||||||
CoprocessorServiceRequest request) throws ServiceException {
|
CoprocessorServiceRequest request) throws ServiceException {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -706,7 +706,7 @@ public class TestCompaction {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterExecute(Store store) {
|
public void afterExecution(Store store) {
|
||||||
done.countDown();
|
done.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,267 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
|
||||||
|
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own
|
||||||
|
* code.
|
||||||
|
*/
|
||||||
|
@Category({ CoprocessorTests.class, MediumTests.class })
|
||||||
|
public class TestCompactionLifeCycleTracker {
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static final TableName NAME =
|
||||||
|
TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName());
|
||||||
|
|
||||||
|
private static final byte[] CF1 = Bytes.toBytes("CF1");
|
||||||
|
|
||||||
|
private static final byte[] CF2 = Bytes.toBytes("CF2");
|
||||||
|
|
||||||
|
private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
|
||||||
|
|
||||||
|
private HRegion region;
|
||||||
|
|
||||||
|
private static CompactionLifeCycleTracker TRACKER = null;
|
||||||
|
|
||||||
|
// make sure that we pass the correct CompactionLifeCycleTracker to CP hooks.
|
||||||
|
public static final class CompactionObserver implements RegionObserver {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
|
List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker)
|
||||||
|
throws IOException {
|
||||||
|
if (TRACKER != null) {
|
||||||
|
assertSame(tracker, TRACKER);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
|
List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
|
||||||
|
CompactionRequest request) {
|
||||||
|
if (TRACKER != null) {
|
||||||
|
assertSame(tracker, TRACKER);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
|
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
|
||||||
|
CompactionRequest request) throws IOException {
|
||||||
|
if (TRACKER != null) {
|
||||||
|
assertSame(tracker, TRACKER);
|
||||||
|
}
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
|
StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request)
|
||||||
|
throws IOException {
|
||||||
|
if (TRACKER != null) {
|
||||||
|
assertSame(tracker, TRACKER);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2);
|
||||||
|
UTIL.startMiniCluster(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
UTIL.getAdmin()
|
||||||
|
.createTable(TableDescriptorBuilder.newBuilder(NAME)
|
||||||
|
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1))
|
||||||
|
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2))
|
||||||
|
.addCoprocessor(CompactionObserver.class.getName()).build());
|
||||||
|
try (Table table = UTIL.getConnection().getTable(NAME)) {
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
table.put(new Put(Bytes.toBytes(i)).addImmutable(CF1, QUALIFIER, Bytes.toBytes(i)));
|
||||||
|
}
|
||||||
|
UTIL.getAdmin().flush(NAME);
|
||||||
|
for (int i = 100; i < 200; i++) {
|
||||||
|
table.put(new Put(Bytes.toBytes(i)).addImmutable(CF1, QUALIFIER, Bytes.toBytes(i)));
|
||||||
|
}
|
||||||
|
UTIL.getAdmin().flush(NAME);
|
||||||
|
}
|
||||||
|
region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
|
||||||
|
assertEquals(2, region.getStore(CF1).getStorefilesCount());
|
||||||
|
assertEquals(0, region.getStore(CF2).getStorefilesCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
region = null;
|
||||||
|
TRACKER = null;
|
||||||
|
UTIL.deleteTable(NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class Tracker implements CompactionLifeCycleTracker {
|
||||||
|
|
||||||
|
final List<Pair<Store, String>> notExecutedStores = new ArrayList<>();
|
||||||
|
|
||||||
|
final List<Store> beforeExecuteStores = new ArrayList<>();
|
||||||
|
|
||||||
|
final List<Store> afterExecuteStores = new ArrayList<>();
|
||||||
|
|
||||||
|
private boolean completed = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void notExecuted(Store store, String reason) {
|
||||||
|
notExecutedStores.add(Pair.newPair(store, reason));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beforeExecution(Store store) {
|
||||||
|
beforeExecuteStores.add(store);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterExecution(Store store) {
|
||||||
|
afterExecuteStores.add(store);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void completed() {
|
||||||
|
completed = true;
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void await() throws InterruptedException {
|
||||||
|
while (!completed) {
|
||||||
|
wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestOnRegion() throws IOException, InterruptedException {
|
||||||
|
Tracker tracker = new Tracker();
|
||||||
|
TRACKER = tracker;
|
||||||
|
region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
|
||||||
|
tracker.await();
|
||||||
|
assertEquals(1, tracker.notExecutedStores.size());
|
||||||
|
assertEquals(Bytes.toString(CF2),
|
||||||
|
tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
|
||||||
|
assertThat(tracker.notExecutedStores.get(0).getSecond(),
|
||||||
|
containsString("compaction request was cancelled"));
|
||||||
|
|
||||||
|
assertEquals(1, tracker.beforeExecuteStores.size());
|
||||||
|
assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName());
|
||||||
|
|
||||||
|
assertEquals(1, tracker.afterExecuteStores.size());
|
||||||
|
assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestOnStore() throws IOException, InterruptedException {
|
||||||
|
Tracker tracker = new Tracker();
|
||||||
|
TRACKER = tracker;
|
||||||
|
region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker);
|
||||||
|
tracker.await();
|
||||||
|
assertTrue(tracker.notExecutedStores.isEmpty());
|
||||||
|
assertEquals(1, tracker.beforeExecuteStores.size());
|
||||||
|
assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName());
|
||||||
|
assertEquals(1, tracker.afterExecuteStores.size());
|
||||||
|
assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName());
|
||||||
|
|
||||||
|
tracker = new Tracker();
|
||||||
|
TRACKER = tracker;
|
||||||
|
region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker);
|
||||||
|
tracker.await();
|
||||||
|
assertEquals(1, tracker.notExecutedStores.size());
|
||||||
|
assertEquals(Bytes.toString(CF2),
|
||||||
|
tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
|
||||||
|
assertThat(tracker.notExecutedStores.get(0).getSecond(),
|
||||||
|
containsString("compaction request was cancelled"));
|
||||||
|
assertTrue(tracker.beforeExecuteStores.isEmpty());
|
||||||
|
assertTrue(tracker.afterExecuteStores.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSpaceQuotaViolation() throws IOException, InterruptedException {
|
||||||
|
region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME,
|
||||||
|
new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L,
|
||||||
|
100L));
|
||||||
|
Tracker tracker = new Tracker();
|
||||||
|
TRACKER = tracker;
|
||||||
|
region.requestCompaction("test", Store.PRIORITY_USER, false, tracker);
|
||||||
|
tracker.await();
|
||||||
|
assertEquals(2, tracker.notExecutedStores.size());
|
||||||
|
tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName()
|
||||||
|
.compareTo(p2.getFirst().getColumnFamilyName()));
|
||||||
|
|
||||||
|
assertEquals(Bytes.toString(CF1),
|
||||||
|
tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName());
|
||||||
|
assertThat(tracker.notExecutedStores.get(0).getSecond(),
|
||||||
|
containsString("space quota violation"));
|
||||||
|
|
||||||
|
assertEquals(Bytes.toString(CF2),
|
||||||
|
tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName());
|
||||||
|
assertThat(tracker.notExecutedStores.get(1).getSecond(),
|
||||||
|
containsString("space quota violation"));
|
||||||
|
|
||||||
|
assertTrue(tracker.beforeExecuteStores.isEmpty());
|
||||||
|
assertTrue(tracker.afterExecuteStores.isEmpty());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue