HBASE-18453 CompactionRequest should not be exposed to user directly

This commit is contained in:
zhangduo 2017-09-11 08:50:37 +08:00
parent 58e3b18ec0
commit 46d993b0ae
51 changed files with 809 additions and 966 deletions

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -203,7 +204,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionRequest request, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) { if (scanInfo == null) {
// take default action // take default action

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
@ -144,15 +145,17 @@ public class TestRefreshHFilesEndpoint {
} }
@Override @Override
public List<Store> getStores() { public List<HStore> getStores() {
List<Store> list = new ArrayList<Store>(stores.size()); List<HStore> list = new ArrayList<>(stores.size());
/** /**
* This is used to trigger the custom definition (faulty) * This is used to trigger the custom definition (faulty)
* of refresh HFiles API. * of refresh HFiles API.
*/ */
try { try {
if (this.store == null) if (this.store == null) {
store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY), this.conf); store = new HStoreWithFaultyRefreshHFilesAPI(this,
ColumnFamilyDescriptorBuilder.of(FAMILY), this.conf);
}
list.add(store); list.add(store);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("Couldn't instantiate custom store implementation", ioe); LOG.info("Couldn't instantiate custom store implementation", ioe);

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.mapreduce.JobUtil; import org.apache.hadoop.hbase.mapreduce.JobUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -158,10 +160,13 @@ public class CompactionTool extends Configured implements Tool {
store.triggerMajorCompaction(); store.triggerMajorCompaction();
} }
do { do {
CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null); Optional<CompactionContext> compaction =
if (compaction == null) break; store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
if (!compaction.isPresent()) {
break;
}
List<StoreFile> storeFiles = List<StoreFile> storeFiles =
store.compact(compaction, NoLimitThroughputController.INSTANCE); store.compact(compaction.get(), NoLimitThroughputController.INSTANCE);
if (storeFiles != null && !storeFiles.isEmpty()) { if (storeFiles != null && !storeFiles.isEmpty()) {
if (keepCompactedFiles && deleteCompacted) { if (keepCompactedFiles && deleteCompacted) {
for (StoreFile storeFile: storeFiles) { for (StoreFile storeFile: storeFiles) {

View File

@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -186,10 +186,10 @@ public interface RegionObserver extends Coprocessor {
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param store the store where compaction is being requested * @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction * @param candidates the store files currently available for compaction
* @param request custom compaction request * @param tracker tracker used to track the life cycle of a compaction
*/ */
default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<StoreFile> candidates, CompactionRequest request) throws IOException {} List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {}
/** /**
* Called after the {@link StoreFile}s to compact have been selected from the available * Called after the {@link StoreFile}s to compact have been selected from the available
@ -197,10 +197,10 @@ public interface RegionObserver extends Coprocessor {
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param store the store being compacted * @param store the store being compacted
* @param selected the store files selected to compact * @param selected the store files selected to compact
* @param request custom compaction request * @param tracker tracker used to track the life cycle of a compaction
*/ */
default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ImmutableList<StoreFile> selected, CompactionRequest request) {} ImmutableList<StoreFile> selected, CompactionLifeCycleTracker tracker) {}
/** /**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new * Called prior to writing the {@link StoreFile}s selected for compaction into a new
@ -220,13 +220,13 @@ public interface RegionObserver extends Coprocessor {
* @param store the store being compacted * @param store the store being compacted
* @param scanner the scanner over existing data used in the store file rewriting * @param scanner the scanner over existing data used in the store file rewriting
* @param scanType type of Scan * @param scanType type of Scan
* @param request the requested compaction * @param tracker tracker used to track the life cycle of a compaction
* @return the scanner to use during compaction. Should not be {@code null} unless the * @return the scanner to use during compaction. Should not be {@code null} unless the
* implementation is writing new store files on its own. * implementation is writing new store files on its own.
*/ */
default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
Store store, InternalScanner scanner, ScanType scanType, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
CompactionRequest request) throws IOException { throws IOException {
return scanner; return scanner;
} }
@ -245,14 +245,14 @@ public interface RegionObserver extends Coprocessor {
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved store * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
* files * files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param request compaction request * @param tracker used to track the life cycle of a compaction
* @param readPoint the readpoint to create scanner * @param readPoint the readpoint to create scanner
* @return the scanner to use during compaction. {@code null} if the default implementation is to * @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used. * be used.
*/ */
default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionRequest request, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
return s; return s;
} }
@ -261,10 +261,10 @@ public interface RegionObserver extends Coprocessor {
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param store the store being compacted * @param store the store being compacted
* @param resultFile the new store file written out during compaction * @param resultFile the new store file written out during compaction
* @param request the requested compaction * @param tracker used to track the life cycle of a compaction
*/ */
default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, CompactionRequest request) throws IOException {} StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {}
/** /**
* Called before the region is reported as closed to the master. * Called before the region is reported as closed to the master.
@ -798,12 +798,12 @@ public interface RegionObserver extends Coprocessor {
* Called before a store opens a new scanner. * Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened. * This hook is called when a "user" scanner is opened.
* <p> * <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext, * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
* Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* to override scanners created for flushes or compactions, resp. * InternalScanner, CompactionLifeCycleTracker, long)} to override scanners created for flushes
* or compactions, resp.
* <p> * <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors.
* coprocessors.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook. * effect in this hook.
* <p> * <p>

View File

@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
@ -41,24 +40,23 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.StealJobQueue; import org.apache.hadoop.hbase.util.StealJobQueue;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
/** /**
* Compact region on request and then run split if appropriate * Compact region on request and then run split if appropriate
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class CompactSplit implements CompactionRequestor, PropagatingConfigurationObserver { public class CompactSplit implements PropagatingConfigurationObserver {
private static final Log LOG = LogFactory.getLog(CompactSplit.class); private static final Log LOG = LogFactory.getLog(CompactSplit.class);
// Configuration key for the large compaction threads. // Configuration key for the large compaction threads.
@ -233,126 +231,89 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
} }
} }
@Override public synchronized void requestCompaction(HRegion region, String why, int priority,
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why) CompactionLifeCycleTracker tracker, User user) throws IOException {
throws IOException { requestCompactionInternal(region, why, priority, true, tracker, user);
return requestCompaction(r, why, null);
} }
@Override public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, CompactionLifeCycleTracker tracker, User user) throws IOException {
List<Pair<CompactionRequest, Store>> requests) throws IOException { requestCompactionInternal(region, store, why, priority, true, tracker, user);
return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
} }
@Override private void requestCompactionInternal(HRegion region, String why, int priority,
public synchronized CompactionRequest requestCompaction(final Region r, final Store s, boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
final String why, CompactionRequest request) throws IOException { // request compaction on all stores
return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null); for (HStore store : region.stores.values()) {
requestCompactionInternal(region, store, why, priority, selectNow, tracker, user);
}
} }
@Override private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why, boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException {
int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException { if (this.server.isStopped() || (region.getTableDescriptor() != null &&
return requestCompactionInternal(r, why, p, requests, true, user); !region.getTableDescriptor().isCompactionEnabled())) {
} return;
}
private List<CompactionRequest> requestCompactionInternal(final Region r, final String why, Optional<CompactionContext> compaction;
int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user) if (selectNow) {
throws IOException { compaction = selectCompaction(region, store, priority, tracker, user);
// not a special compaction request, so make our own list if (!compaction.isPresent()) {
List<CompactionRequest> ret = null; // message logged inside
if (requests == null) { return;
ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
for (Store s : r.getStores()) {
CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
if (selectNow) ret.add(cr);
} }
} else { } else {
Preconditions.checkArgument(selectNow); // only system requests have selectNow == false compaction = Optional.empty();
ret = new ArrayList<CompactionRequest>(requests.size());
for (Pair<CompactionRequest, Store> pair : requests) {
ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
}
}
return ret;
}
public CompactionRequest requestCompaction(final Region r, final Store s,
final String why, int priority, CompactionRequest request, User user) throws IOException {
return requestCompactionInternal(r, s, why, priority, request, true, user);
}
public synchronized void requestSystemCompaction(
final Region r, final String why) throws IOException {
requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
}
public void requestSystemCompaction(
final Region r, final Store s, final String why) throws IOException {
requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
}
/**
* @param r region store belongs to
* @param s Store to request compaction on
* @param why Why compaction requested -- used in debug messages
* @param priority override the default priority (NO_PRIORITY == decide)
* @param request custom compaction request. Can be <tt>null</tt> in which case a simple
* compaction will be used.
*/
private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
final String why, int priority, CompactionRequest request, boolean selectNow, User user)
throws IOException {
if (this.server.isStopped()
|| (r.getTableDescriptor() != null && !r.getTableDescriptor().isCompactionEnabled())) {
return null;
} }
CompactionContext compaction = null; RegionServerSpaceQuotaManager spaceQuotaManager =
if (selectNow) { this.server.getRegionServerSpaceQuotaManager();
compaction = selectCompaction(r, s, priority, request, user); if (spaceQuotaManager != null &&
if (compaction == null) return null; // message logged inside spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
}
final RegionServerSpaceQuotaManager spaceQuotaManager =
this.server.getRegionServerSpaceQuotaManager();
if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
r.getTableDescriptor().getTableName())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation " LOG.debug("Ignoring compaction request for " + region +
+ " policy disallows compactions."); " as an active space quota violation " + " policy disallows compactions.");
} }
return null; return;
} }
// We assume that most compactions are small. So, put system compactions into small ThreadPoolExecutor pool;
// pool; we will do selection there, and move to large pool if necessary. if (selectNow) {
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) // compaction.get is safe as we will just return if selectNow is true but no compaction is
? longCompactions : shortCompactions; // selected
pool.execute(new CompactionRunner(s, r, compaction, pool, user)); pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions
((HRegion)r).incrementCompactionsQueuedCount(); : shortCompactions;
} else {
// We assume that most compactions are small. So, put system compactions into small
// pool; we will do selection there, and move to large pool if necessary.
pool = shortCompactions;
}
pool.execute(new CompactionRunner(store, region, compaction, pool, user));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large "; String type = (pool == shortCompactions) ? "Small " : "Large ";
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this); + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
} }
return selectNow ? compaction.getRequest() : null;
} }
private CompactionContext selectCompaction(final Region r, final Store s, public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
int priority, CompactionRequest request, User user) throws IOException { requestCompactionInternal(region, why, Store.NO_PRIORITY, false,
CompactionContext compaction = s.requestCompaction(priority, request, user); CompactionLifeCycleTracker.DUMMY, null);
if (compaction == null) { }
if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
" because compaction request was cancelled"); throws IOException {
} requestCompactionInternal(region, store, why, Store.NO_PRIORITY, false,
return null; CompactionLifeCycleTracker.DUMMY, null);
} }
assert compaction.hasSelection();
if (priority != Store.NO_PRIORITY) { private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
compaction.getRequest().setPriority(priority); CompactionLifeCycleTracker tracker, User user) throws IOException {
Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
if (!compaction.isPresent() && LOG.isDebugEnabled() && region.getRegionInfo() != null) {
LOG.debug("Not compacting " + region.getRegionInfo().getRegionNameAsString() +
" because compaction request was cancelled");
} }
return compaction; return compaction;
} }
@ -468,33 +429,33 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
if (cmp != 0) { if (cmp != 0) {
return cmp; return cmp;
} }
CompactionContext c1 = o1.compaction; Optional<CompactionContext> c1 = o1.compaction;
CompactionContext c2 = o2.compaction; Optional<CompactionContext> c2 = o2.compaction;
if (c1 == null) { if (c1.isPresent()) {
return c2 == null ? 0 : 1; return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1;
} else { } else {
return c2 == null ? -1 : compare(c1.getRequest(), c2.getRequest()); return c2.isPresent() ? 1 : 0;
} }
} }
}; };
private final class CompactionRunner implements Runnable { private final class CompactionRunner implements Runnable {
private final Store store; private final HStore store;
private final HRegion region; private final HRegion region;
private CompactionContext compaction; private final Optional<CompactionContext> compaction;
private int queuedPriority; private int queuedPriority;
private ThreadPoolExecutor parent; private ThreadPoolExecutor parent;
private User user; private User user;
private long time; private long time;
public CompactionRunner(Store store, Region region, CompactionContext compaction, public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext> compaction,
ThreadPoolExecutor parent, User user) { ThreadPoolExecutor parent, User user) {
super(); super();
this.store = store; this.store = store;
this.region = (HRegion) region; this.region = region;
this.compaction = compaction; this.compaction = compaction;
this.queuedPriority = this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority()
compaction == null ? store.getCompactPriority() : compaction.getRequest().getPriority(); : store.getCompactPriority();
this.parent = parent; this.parent = parent;
this.user = user; this.user = user;
this.time = System.currentTimeMillis(); this.time = System.currentTimeMillis();
@ -502,14 +463,15 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
@Override @Override
public String toString() { public String toString() {
return (this.compaction != null) ? ("Request = " + compaction.getRequest()) return compaction.map(c -> "Request = " + c.getRequest())
: ("regionName = " + region.toString() + ", storeName = " + store.toString() + .orElse("regionName = " + region.toString() + ", storeName = " + store.toString() +
", priority = " + queuedPriority + ", time = " + time); ", priority = " + queuedPriority + ", time = " + time);
} }
private void doCompaction(User user) { private void doCompaction(User user) {
CompactionContext c;
// Common case - system compaction without a file selection. Select now. // Common case - system compaction without a file selection. Select now.
if (this.compaction == null) { if (!compaction.isPresent()) {
int oldPriority = this.queuedPriority; int oldPriority = this.queuedPriority;
this.queuedPriority = this.store.getCompactPriority(); this.queuedPriority = this.store.getCompactPriority();
if (this.queuedPriority > oldPriority) { if (this.queuedPriority > oldPriority) {
@ -518,44 +480,49 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
this.parent.execute(this); this.parent.execute(this);
return; return;
} }
Optional<CompactionContext> selected;
try { try {
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); selected = selectCompaction(this.region, this.store, queuedPriority,
CompactionLifeCycleTracker.DUMMY, user);
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Compaction selection failed " + this, ex); LOG.error("Compaction selection failed " + this, ex);
server.checkFileSystem(); server.checkFileSystem();
region.decrementCompactionsQueuedCount(); region.decrementCompactionsQueuedCount();
return; return;
} }
if (this.compaction == null) { if (!selected.isPresent()) {
region.decrementCompactionsQueuedCount(); region.decrementCompactionsQueuedCount();
return; // nothing to do return; // nothing to do
} }
c = selected.get();
assert c.hasSelection();
// Now see if we are in correct pool for the size; if not, go to the correct one. // Now see if we are in correct pool for the size; if not, go to the correct one.
// We might end up waiting for a while, so cancel the selection. // We might end up waiting for a while, so cancel the selection.
assert this.compaction.hasSelection();
ThreadPoolExecutor pool = store.throttleCompaction( ThreadPoolExecutor pool =
compaction.getRequest().getSize()) ? longCompactions : shortCompactions; store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
// Long compaction pool can process small job // Long compaction pool can process small job
// Short compaction pool should not process large job // Short compaction pool should not process large job
if (this.parent == shortCompactions && pool == longCompactions) { if (this.parent == shortCompactions && pool == longCompactions) {
this.store.cancelRequestedCompaction(this.compaction); this.store.cancelRequestedCompaction(c);
this.compaction = null;
this.parent = pool; this.parent = pool;
this.parent.execute(this); this.parent.execute(this);
return; return;
} }
} else {
c = compaction.get();
} }
// Finally we can compact something. // Finally we can compact something.
assert this.compaction != null; assert c != null;
this.compaction.getRequest().beforeExecute(); c.getRequest().getTracker().beforeExecute(store);
try { try {
// Note: please don't put single-compaction logic here; // Note: please don't put single-compaction logic here;
// put it into region/store/etc. This is CST logic. // put it into region/store/etc. This is CST logic.
long start = EnvironmentEdgeManager.currentTime(); long start = EnvironmentEdgeManager.currentTime();
boolean completed = boolean completed =
region.compact(compaction, store, compactionThroughputController, user); region.compact(c, store, compactionThroughputController, user);
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start)); this + "; duration=" + StringUtils.formatTimeDiff(now, start));
@ -582,10 +549,10 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
region.reportCompactionRequestFailure(); region.reportCompactionRequestFailure();
server.checkFileSystem(); server.checkFileSystem();
} finally { } finally {
c.getRequest().getTracker().afterExecute(store);
region.decrementCompactionsQueuedCount(); region.decrementCompactionsQueuedCount();
LOG.debug("CompactSplitThread Status: " + CompactSplit.this); LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
} }
this.compaction.getRequest().afterExecute();
} }
@Override @Override
@ -615,9 +582,9 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
@Override @Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
if (runnable instanceof CompactionRunner) { if (runnable instanceof CompactionRunner) {
CompactionRunner runner = (CompactionRunner)runnable; CompactionRunner runner = (CompactionRunner) runnable;
LOG.debug("Compaction Rejected: " + runner); LOG.debug("Compaction Rejected: " + runner);
runner.store.cancelRequestedCompaction(runner.compaction); runner.compaction.ifPresent(c -> runner.store.cancelRequestedCompaction(c));
} }
} }
} }

View File

@ -1,100 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Private
public interface CompactionRequestor {
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
* compactions were started
* @throws IOException
*/
List<CompactionRequest> requestCompaction(final Region r, final String why)
throws IOException;
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @param requests custom compaction requests. Each compaction must specify the store on which it
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
* stores for the region.
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
* compactions were started
* @throws IOException
*/
List<CompactionRequest> requestCompaction(
final Region r, final String why, List<Pair<CompactionRequest, Store>> requests
)
throws IOException;
/**
* @param r Region to compact
* @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
* @param request custom compaction request for the {@link Region} and {@link Store}. Custom
* request must be <tt>null</tt> or be constructed with matching region and store.
* @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started.
* @throws IOException
*/
CompactionRequest requestCompaction(
final Region r, final Store s, final String why, CompactionRequest request
) throws IOException;
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. &lt;=0 is critical
* @param requests custom compaction requests. Each compaction must specify the store on which it
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
* stores for the region.
* @param user the effective user
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
* compactions were started.
* @throws IOException
*/
List<CompactionRequest> requestCompaction(
final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests,
User user
) throws IOException;
/**
* @param r Region to compact
* @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. &lt;=0 is critical
* @param request custom compaction request to run. {@link Store} and {@link Region} for the
* request must match the region and store specified here.
* @param user
* @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started
* @throws IOException
*/
CompactionRequest requestCompaction(
final Region r, final Store s, final String why, int pri, CompactionRequest request, User user
) throws IOException;
}

View File

@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* enough, then all stores will be flushed. * enough, then all stores will be flushed.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy {
private static final Log LOG = LogFactory.getLog(FlushAllLargeStoresPolicy.class); private static final Log LOG = LogFactory.getLog(FlushAllLargeStoresPolicy.class);
@ -48,20 +48,22 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
} }
@Override @Override
public Collection<Store> selectStoresToFlush() { public Collection<HStore> selectStoresToFlush() {
// no need to select stores if only one family // no need to select stores if only one family
if (region.getTableDescriptor().getColumnFamilyCount() == 1) { if (region.getTableDescriptor().getColumnFamilyCount() == 1) {
return region.stores.values(); return region.stores.values();
} }
// start selection // start selection
Collection<Store> stores = region.stores.values(); Collection<HStore> stores = region.stores.values();
Set<Store> specificStoresToFlush = new HashSet<>(); Set<HStore> specificStoresToFlush = new HashSet<>();
for (Store store : stores) { for (HStore store : stores) {
if (shouldFlush(store)) { if (shouldFlush(store)) {
specificStoresToFlush.add(store); specificStoresToFlush.add(store);
} }
} }
if (!specificStoresToFlush.isEmpty()) return specificStoresToFlush; if (!specificStoresToFlush.isEmpty()) {
return specificStoresToFlush;
}
// Didn't find any CFs which were above the threshold for selection. // Didn't find any CFs which were above the threshold for selection.
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -71,8 +73,8 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
} }
@Override @Override
protected boolean shouldFlush(Store store) { protected boolean shouldFlush(HStore store) {
return (super.shouldFlush(store) || region.shouldFlushStore(store)); return super.shouldFlush(store) || region.shouldFlushStore(store);
} }
} }

View File

@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
public class FlushAllStoresPolicy extends FlushPolicy { public class FlushAllStoresPolicy extends FlushPolicy {
@Override @Override
public Collection<Store> selectStoresToFlush() { public Collection<HStore> selectStoresToFlush() {
return region.stores.values(); return region.stores.values();
} }

View File

@ -77,7 +77,7 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy {
return flushSizeLowerBound; return flushSizeLowerBound;
} }
protected boolean shouldFlush(Store store) { protected boolean shouldFlush(HStore store) {
if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) { if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " + LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +

View File

@ -32,26 +32,31 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy { public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
private Collection<Store> regularStores = new HashSet<>(); private Collection<HStore> regularStores = new HashSet<>();
private Collection<Store> sloppyStores = new HashSet<>(); private Collection<HStore> sloppyStores = new HashSet<>();
/** /**
* @return the stores need to be flushed. * @return the stores need to be flushed.
*/ */
@Override public Collection<Store> selectStoresToFlush() { @Override
Collection<Store> specificStoresToFlush = new HashSet<>(); public Collection<HStore> selectStoresToFlush() {
for(Store store : regularStores) { Collection<HStore> specificStoresToFlush = new HashSet<>();
if(shouldFlush(store) || region.shouldFlushStore(store)) { for (HStore store : regularStores) {
if (shouldFlush(store) || region.shouldFlushStore(store)) {
specificStoresToFlush.add(store); specificStoresToFlush.add(store);
} }
} }
if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush; if (!specificStoresToFlush.isEmpty()) {
for(Store store : sloppyStores) { return specificStoresToFlush;
if(shouldFlush(store)) { }
for (HStore store : sloppyStores) {
if (shouldFlush(store)) {
specificStoresToFlush.add(store); specificStoresToFlush.add(store);
} }
} }
if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush; if (!specificStoresToFlush.isEmpty()) {
return specificStoresToFlush;
}
return region.stores.values(); return region.stores.values();
} }
@ -59,8 +64,8 @@ public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
protected void configureForRegion(HRegion region) { protected void configureForRegion(HRegion region) {
super.configureForRegion(region); super.configureForRegion(region);
this.flushSizeLowerBound = getFlushSizeLowerBound(region); this.flushSizeLowerBound = getFlushSizeLowerBound(region);
for(Store store : region.stores.values()) { for (HStore store : region.stores.values()) {
if(store.isSloppyMemstore()) { if (store.isSloppyMemstore()) {
sloppyStores.add(store); sloppyStores.add(store);
} else { } else {
regularStores.add(store); regularStores.add(store);

View File

@ -44,6 +44,6 @@ public abstract class FlushPolicy extends Configured {
/** /**
* @return the stores need to be flushed. * @return the stores need to be flushed.
*/ */
public abstract Collection<Store> selectStoresToFlush(); public abstract Collection<HStore> selectStoresToFlush();
} }

View File

@ -56,8 +56,8 @@ import javax.management.MalformedObjectNameException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@ -140,6 +141,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@ -210,10 +214,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
import sun.misc.Signal; import sun.misc.Signal;
import sun.misc.SignalHandler; import sun.misc.SignalHandler;
@ -1686,7 +1686,7 @@ public class HRegionServer extends HasThread implements
int totalStaticBloomSizeKB = 0; int totalStaticBloomSizeKB = 0;
long totalCompactingKVs = 0; long totalCompactingKVs = 0;
long currentCompactedKVs = 0; long currentCompactedKVs = 0;
List<Store> storeList = r.getStores(); List<? extends Store> storeList = r.getStores();
stores += storeList.size(); stores += storeList.size();
for (Store store : storeList) { for (Store store : storeList) {
storefiles += store.getStorefilesCount(); storefiles += store.getStorefilesCount();
@ -1772,27 +1772,32 @@ public class HRegionServer extends HasThread implements
@Override @Override
protected void chore() { protected void chore() {
for (Region r : this.instance.onlineRegions.values()) { for (Region r : this.instance.onlineRegions.values()) {
if (r == null) if (r == null) {
continue; continue;
for (Store s : r.getStores()) { }
HRegion hr = (HRegion) r;
for (HStore s : hr.stores.values()) {
try { try {
long multiplier = s.getCompactionCheckMultiplier(); long multiplier = s.getCompactionCheckMultiplier();
assert multiplier > 0; assert multiplier > 0;
if (iteration % multiplier != 0) continue; if (iteration % multiplier != 0) {
continue;
}
if (s.needsCompaction()) { if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed. // Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() this.instance.compactSplitThread.requestSystemCompaction(hr, s,
+ " requests compaction"); getName() + " requests compaction");
} else if (s.isMajorCompaction()) { } else if (s.isMajorCompaction()) {
s.triggerMajorCompaction(); s.triggerMajorCompaction();
if (majorCompactPriority == DEFAULT_PRIORITY if (majorCompactPriority == DEFAULT_PRIORITY ||
|| majorCompactPriority > ((HRegion)r).getCompactPriority()) { majorCompactPriority > hr.getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(r, s, getName() this.instance.compactSplitThread.requestCompaction(hr, s,
+ " requests major compaction; use default priority", null); getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
CompactionLifeCycleTracker.DUMMY, null);
} else { } else {
this.instance.compactSplitThread.requestCompaction(r, s, getName() this.instance.compactSplitThread.requestCompaction(hr, s,
+ " requests major compaction; use configured priority", getName() + " requests major compaction; use configured priority",
this.majorCompactPriority, null, null); this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
} }
} }
} catch (IOException e) { } catch (IOException e) {
@ -2146,15 +2151,14 @@ public class HRegionServer extends HasThread implements
@Override @Override
public void postOpenDeployTasks(final PostOpenDeployContext context) public void postOpenDeployTasks(final PostOpenDeployContext context)
throws KeeperException, IOException { throws KeeperException, IOException {
Region r = context.getRegion(); HRegion r = (HRegion) context.getRegion();
long masterSystemTime = context.getMasterSystemTime(); long masterSystemTime = context.getMasterSystemTime();
Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
rpcServices.checkOpen(); rpcServices.checkOpen();
LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString()); LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
// Do checks to see if we need to compact (references or too many files) // Do checks to see if we need to compact (references or too many files)
for (Store s : r.getStores()) { for (HStore s : r.stores.values()) {
if (s.hasReferences() || s.needsCompaction()) { if (s.hasReferences() || s.needsCompaction()) {
this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
} }
} }
long openSeqNum = r.getOpenSeqNum(); long openSeqNum = r.getOpenSeqNum();
@ -2863,11 +2867,6 @@ public class HRegionServer extends HasThread implements
return serverName; return serverName;
} }
@Override
public CompactionRequestor getCompactionRequester() {
return this.compactSplitThread;
}
public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){ public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
return this.rsHost; return this.rsHost;
} }

View File

@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
@ -52,13 +53,12 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.FailedArchiveException; import org.apache.hadoop.hbase.backup.FailedArchiveException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
@ -82,8 +83,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -92,14 +91,16 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
/** /**
* A Store holds a column family in a Region. Its a memstore and a set of zero * A Store holds a column family in a Region. Its a memstore and a set of zero
@ -477,7 +478,7 @@ public class HStore implements Store {
/** /**
* @param tabledir {@link Path} to where the table is being stored * @param tabledir {@link Path} to where the table is being stored
* @param hri {@link HRegionInfo} for the region. * @param hri {@link HRegionInfo} for the region.
* @param family {@link HColumnDescriptor} describing the column family * @param family {@link ColumnFamilyDescriptor} describing the column family
* @return Path to family/Store home directory. * @return Path to family/Store home directory.
*/ */
@Deprecated @Deprecated
@ -489,7 +490,7 @@ public class HStore implements Store {
/** /**
* @param tabledir {@link Path} to where the table is being stored * @param tabledir {@link Path} to where the table is being stored
* @param encodedName Encoded region name. * @param encodedName Encoded region name.
* @param family {@link HColumnDescriptor} describing the column family * @param family {@link ColumnFamilyDescriptor} describing the column family
* @return Path to family/Store home directory. * @return Path to family/Store home directory.
*/ */
@Deprecated @Deprecated
@ -1386,15 +1387,14 @@ public class HStore implements Store {
} }
} }
private List<StoreFile> moveCompatedFilesIntoPlace( private List<StoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles,
final CompactionRequest cr, List<Path> newFiles, User user) throws IOException { User user) throws IOException {
List<StoreFile> sfs = new ArrayList<>(newFiles.size()); List<StoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) { for (Path newFile : newFiles) {
assert newFile != null; assert newFile != null;
final StoreFile sf = moveFileIntoPlace(newFile); StoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
final Store thisStore = this; getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user);
getCoprocessorHost().postCompact(thisStore, sf, cr, user);
} }
assert sf != null; assert sf != null;
sfs.add(sf); sfs.add(sf);
@ -1636,23 +1636,12 @@ public class HStore implements Store {
} }
@Override @Override
public CompactionContext requestCompaction() throws IOException { public Optional<CompactionContext> requestCompaction(int priority,
return requestCompaction(Store.NO_PRIORITY, null); CompactionLifeCycleTracker tracker, User user) throws IOException {
}
@Override
public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
throws IOException {
return requestCompaction(priority, baseRequest, null);
}
@Override
public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
User user) throws IOException {
// don't even select for compaction if writes are disabled // don't even select for compaction if writes are disabled
if (!this.areWritesEnabled()) { if (!this.areWritesEnabled()) {
return null; return Optional.empty();
} }
// Before we do compaction, try to get rid of unneeded files to simplify things. // Before we do compaction, try to get rid of unneeded files to simplify things.
removeUnneededFiles(); removeUnneededFiles();
@ -1666,7 +1655,7 @@ public class HStore implements Store {
final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = false; boolean override = false;
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
baseRequest, user); tracker, user);
if (override) { if (override) {
// Coprocessor is overriding normal file selection. // Coprocessor is overriding normal file selection.
compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
@ -1695,21 +1684,13 @@ public class HStore implements Store {
} }
if (this.getCoprocessorHost() != null) { if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompactSelection( this.getCoprocessorHost().postCompactSelection(
this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest, user); this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user);
}
// Selected files; see if we have a compaction with some custom base request.
if (baseRequest != null) {
// Update the request with what the system thinks the request should be;
// its up to the request if it wants to listen.
compaction.forceSelect(
baseRequest.combineWith(compaction.getRequest()));
} }
// Finally, we have the resulting files list. Check if we have any files at all. // Finally, we have the resulting files list. Check if we have any files at all.
request = compaction.getRequest(); request = compaction.getRequest();
final Collection<StoreFile> selectedFiles = request.getFiles(); Collection<StoreFile> selectedFiles = request.getFiles();
if (selectedFiles.isEmpty()) { if (selectedFiles.isEmpty()) {
return null; return Optional.empty();
} }
addToCompactingFiles(selectedFiles); addToCompactingFiles(selectedFiles);
@ -1721,6 +1702,7 @@ public class HStore implements Store {
// Set priority, either override value supplied by caller or from store. // Set priority, either override value supplied by caller or from store.
request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
request.setTracker(tracker);
} }
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
@ -1730,7 +1712,7 @@ public class HStore implements Store {
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
+ (request.isAllFiles() ? " (all files)" : "")); + (request.isAllFiles() ? " (all files)" : ""));
this.region.reportCompactionRequestStart(request.isMajor()); this.region.reportCompactionRequestStart(request.isMajor());
return compaction; return Optional.of(compaction);
} }
/** Adds the files to compacting files. filesCompacting must be locked. */ /** Adds the files to compacting files. filesCompacting must be locked. */

View File

@ -20,11 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.util.StringUtils.humanReadableInt; import static org.apache.hadoop.util.StringUtils.humanReadableInt;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.MemoryType;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.HashMap; import java.util.HashMap;
@ -50,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
@ -448,8 +446,8 @@ class MemStoreFlusher implements FlushRequester {
"store files; delaying flush up to " + this.blockingWaitTime + "ms"); "store files; delaying flush up to " + this.blockingWaitTime + "ms");
if (!this.server.compactSplitThread.requestSplit(region)) { if (!this.server.compactSplitThread.requestSplit(region)) {
try { try {
this.server.compactSplitThread.requestSystemCompaction( this.server.compactSplitThread.requestSystemCompaction((HRegion) region,
region, Thread.currentThread().getName()); Thread.currentThread().getName());
} catch (IOException e) { } catch (IOException e) {
e = e instanceof RemoteException ? e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e; ((RemoteException)e).unwrapRemoteException() : e;
@ -503,8 +501,8 @@ class MemStoreFlusher implements FlushRequester {
if (shouldSplit) { if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region); this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) { } else if (shouldCompact) {
server.compactSplitThread.requestSystemCompaction( server.compactSplitThread.requestSystemCompaction((HRegion) region,
region, Thread.currentThread().getName()); Thread.currentThread().getName());
} }
} catch (DroppedSnapshotException ex) { } catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical // Cache flush can fail in a few places. If it fails in a critical

View File

@ -761,7 +761,7 @@ class MetricsRegionServerWrapperImpl
tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed(); tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed();
tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed(); tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed();
tempBlockedRequestsCount += r.getBlockedRequestsCount(); tempBlockedRequestsCount += r.getBlockedRequestsCount();
List<Store> storeList = r.getStores(); List<? extends Store> storeList = r.getStores();
tempNumStores += storeList.size(); tempNumStores += storeList.size();
for (Store store : storeList) { for (Store store : storeList) {
tempNumStoreFiles += store.getStorefilesCount(); tempNumStoreFiles += store.getStorefilesCount();

View File

@ -95,7 +95,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
@Override @Override
public long getNumStores() { public long getNumStores() {
Map<byte[],Store> stores = this.region.stores; Map<byte[], HStore> stores = this.region.stores;
if (stores == null) { if (stores == null) {
return 0; return 0;
} }

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
@ -1538,7 +1539,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try { try {
checkOpen(); checkOpen();
requestCount.increment(); requestCount.increment();
Region region = getRegion(request.getRegion()); HRegion region = (HRegion) getRegion(request.getRegion());
// Quota support is enabled, the requesting user is not system/super user // Quota support is enabled, the requesting user is not system/super user
// and a quota policy is enforced that disables compactions. // and a quota policy is enforced that disables compactions.
if (QuotaUtil.isQuotaEnabled(getConfiguration()) && if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
@ -1552,7 +1553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
boolean major = false; boolean major = false;
byte [] family = null; byte [] family = null;
Store store = null; HStore store = null;
if (request.hasFamily()) { if (request.hasFamily()) {
family = request.getFamily().toByteArray(); family = request.getFamily().toByteArray();
store = region.getStore(family); store = region.getStore(family);
@ -1579,12 +1580,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
+ region.getRegionInfo().getRegionNameAsString() + familyLogMsg); + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
} }
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
if(family != null) { if (family != null) {
regionServer.compactSplitThread.requestCompaction(region, store, log, regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER,
Store.PRIORITY_USER, null, RpcServer.getRequestUser()); CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
} else { } else {
regionServer.compactSplitThread.requestCompaction(region, log, regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER,
Store.PRIORITY_USER, null, RpcServer.getRequestUser()); CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
} }
return CompactRegionResponse.newBuilder().build(); return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) { } catch (IOException ie) {
@ -1606,7 +1607,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try { try {
checkOpen(); checkOpen();
requestCount.increment(); requestCount.increment();
Region region = getRegion(request.getRegion()); HRegion region = (HRegion) getRegion(request.getRegion());
LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
boolean shouldFlush = true; boolean shouldFlush = true;
if (request.hasIfOlderThanTs()) { if (request.hasIfOlderThanTs()) {
@ -1617,8 +1618,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false; request.getWriteFlushWalMarker() : false;
// Go behind the curtain so we can manage writing of the flush WAL marker // Go behind the curtain so we can manage writing of the flush WAL marker
HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl) HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker);
((HRegion)region).flushcache(true, writeFlushWalMarker);
boolean compactionNeeded = flushResult.isCompactionNeeded(); boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) { if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region, regionServer.compactSplitThread.requestSystemCompaction(region,

View File

@ -23,10 +23,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.CompareOperator;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
@ -41,13 +43,17 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
/** /**
* Regions store data for a certain region of a table. It stores all columns * Regions store data for a certain region of a table. It stores all columns
@ -105,7 +111,7 @@ public interface Region extends ConfigurationObserver {
* <p>Use with caution. Exposed for use of fixup utilities. * <p>Use with caution. Exposed for use of fixup utilities.
* @return a list of the Stores managed by this region * @return a list of the Stores managed by this region
*/ */
List<Store> getStores(); List<? extends Store> getStores();
/** /**
* Return the Store for the given family * Return the Store for the given family
@ -115,7 +121,7 @@ public interface Region extends ConfigurationObserver {
Store getStore(byte[] family); Store getStore(byte[] family);
/** @return list of store file names for the given families */ /** @return list of store file names for the given families */
List<String> getStoreFileList(byte [][] columns); List<String> getStoreFileList(byte[][] columns);
/** /**
* Check the region's underlying store files, open the files that have not * Check the region's underlying store files, open the files that have not
@ -753,6 +759,18 @@ public interface Region extends ConfigurationObserver {
*/ */
CompactionState getCompactionState(); CompactionState getCompactionState();
/**
* Request compaction on this region.
*/
void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker, User user)
throws IOException;
/**
* Request compaction for the given family
*/
void requestCompaction(byte[] family, String why, int priority,
CompactionLifeCycleTracker tracker, User user) throws IOException;
/** Wait for all current flushes and compactions of the region to complete */ /** Wait for all current flushes and compactions of the region to complete */
void waitForFlushesAndCompactions(); void waitForFlushesAndCompactions();

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -499,18 +500,18 @@ public class RegionCoprocessorHost
/** /**
* See * See
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionRequest, long)} * InternalScanner, CompactionLifeCycleTracker, long)}
*/ */
public InternalScanner preCompactScannerOpen(final Store store, public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs, ScanType scanType, long earliestPutTs, CompactionLifeCycleTracker tracker, User user,
final CompactionRequest request, final User user, final long readPoint) throws IOException { long readPoint) throws IOException {
return execOperationWithResult(null, return execOperationWithResult(null,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) { coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
@Override @Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException { throws IOException {
setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
earliestPutTs, getResult(), request, readPoint)); earliestPutTs, getResult(), tracker, readPoint));
} }
}); });
} }
@ -520,17 +521,17 @@ public class RegionCoprocessorHost
* available candidates. * available candidates.
* @param store The store where compaction is being requested * @param store The store where compaction is being requested
* @param candidates The currently available store files * @param candidates The currently available store files
* @param request custom compaction request * @param tracker used to track the life cycle of a compaction
* @return If {@code true}, skip the normal selection process and use the current list * @return If {@code true}, skip the normal selection process and use the current list
* @throws IOException * @throws IOException
*/ */
public boolean preCompactSelection(final Store store, final List<StoreFile> candidates, public boolean preCompactSelection(Store store, List<StoreFile> candidates,
final CompactionRequest request, final User user) throws IOException { CompactionLifeCycleTracker tracker, User user) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
@Override @Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException { throws IOException {
oserver.preCompactSelection(ctx, store, candidates, request); oserver.preCompactSelection(ctx, store, candidates, tracker);
} }
}); });
} }
@ -540,21 +541,17 @@ public class RegionCoprocessorHost
* candidates. * candidates.
* @param store The store where compaction is being requested * @param store The store where compaction is being requested
* @param selected The store files selected to compact * @param selected The store files selected to compact
* @param request custom compaction * @param tracker used to track the life cycle of a compaction
*/ */
public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected, public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
final CompactionRequest request, final User user) { CompactionLifeCycleTracker tracker, User user) throws IOException {
try { execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { @Override
@Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException {
throws IOException { oserver.postCompactSelection(ctx, store, selected, tracker);
oserver.postCompactSelection(ctx, store, selected, request); }
} });
});
} catch (IOException e) {
LOG.warn(e);
}
} }
/** /**
@ -562,18 +559,17 @@ public class RegionCoprocessorHost
* @param store the store being compacted * @param store the store being compacted
* @param scanner the scanner used to read store data during compaction * @param scanner the scanner used to read store data during compaction
* @param scanType type of Scan * @param scanType type of Scan
* @param request the compaction that will be executed * @param tracker used to track the life cycle of a compaction
* @throws IOException * @throws IOException
*/ */
public InternalScanner preCompact(final Store store, final InternalScanner scanner, public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
final ScanType scanType, final CompactionRequest request, final User user) CompactionLifeCycleTracker tracker, User user) throws IOException {
throws IOException {
return execOperationWithResult(false, scanner, return execOperationWithResult(false, scanner,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) { coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
@Override @Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException { throws IOException {
setResult(oserver.preCompact(ctx, store, getResult(), scanType, request)); setResult(oserver.preCompact(ctx, store, getResult(), scanType, tracker));
} }
}); });
} }
@ -582,16 +578,16 @@ public class RegionCoprocessorHost
* Called after the store compaction has completed. * Called after the store compaction has completed.
* @param store the store being compacted * @param store the store being compacted
* @param resultFile the new store file written during compaction * @param resultFile the new store file written during compaction
* @param request the compaction that is being executed * @param tracker used to track the life cycle of a compaction
* @throws IOException * @throws IOException
*/ */
public void postCompact(final Store store, final StoreFile resultFile, public void postCompact(Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker,
final CompactionRequest request, final User user) throws IOException { User user) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
@Override @Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException { throws IOException {
oserver.postCompact(ctx, store, resultFile, request); oserver.postCompact(ctx, store, resultFile, tracker);
} }
}); });
} }

View File

@ -63,11 +63,6 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
*/ */
List<WAL> getWALs() throws IOException; List<WAL> getWALs() throws IOException;
/**
* @return Implementation of {@link CompactionRequestor} or null.
*/
CompactionRequestor getCompactionRequester();
/** /**
* @return Implementation of {@link FlushRequester} or null. * @return Implementation of {@link FlushRequester} or null.
*/ */

View File

@ -76,7 +76,7 @@ public abstract class RegionSplitPolicy extends Configured {
if (explicitSplitPoint != null) { if (explicitSplitPoint != null) {
return explicitSplitPoint; return explicitSplitPoint;
} }
List<Store> stores = region.getStores(); List<HStore> stores = region.getStores();
byte[] splitPointFromLargestStore = null; byte[] splitPointFromLargestStore = null;
long largestStoreSize = 0; long largestStoreSize = 0;

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Optional;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
@ -51,7 +53,8 @@ import org.apache.hadoop.hbase.security.User;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface Store extends HeapSize, StoreConfigInformation, PropagatingConfigurationObserver { public interface Store extends HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {
/* The default priority for user-specified compaction requests. /**
* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0) * The user gets top priority unless we have blocking compactions. (Pri <= 0)
*/ */
int PRIORITY_USER = 1; int PRIORITY_USER = 1;
@ -253,17 +256,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/ */
CompactionProgress getCompactionProgress(); CompactionProgress getCompactionProgress();
CompactionContext requestCompaction() throws IOException; default Optional<CompactionContext> requestCompaction() throws IOException {
return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);
}
/** Optional<CompactionContext> requestCompaction(int priority, CompactionLifeCycleTracker tracker,
* @deprecated see requestCompaction(int, CompactionRequest, User) User user) throws IOException;
*/
@Deprecated
CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
throws IOException;
CompactionContext requestCompaction(int priority, CompactionRequest baseRequest, User user)
throws IOException;
void cancelRequestedCompaction(CompactionContext compaction); void cancelRequestedCompaction(CompactionContext compaction);

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Used to track compaction execution.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface CompactionLifeCycleTracker {
static CompactionLifeCycleTracker DUMMY = new CompactionLifeCycleTracker() {
};
/**
* Called before compaction is executed by CompactSplitThread.
* <p>
* Requesting compaction on a region can lead to multiple compactions on different stores, so we
* will pass the {@link Store} in to tell you the store we operate on.
*/
default void beforeExecute(Store store) {
}
/**
* Called after compaction is executed by CompactSplitThread.
* <p>
* Requesting compaction on a region can lead to multiple compactions on different stores, so we
* will pass the {@link Store} in to tell you the store we operate on.
*/
default void afterExecute(Store store) {
}
}

View File

@ -18,25 +18,21 @@
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
/** /**
* This class holds all logical details necessary to run a compaction. * This class holds all logical details necessary to run a compaction.
*/ */
@InterfaceAudience.LimitedPrivate({ "coprocessor" }) @InterfaceAudience.Private
@InterfaceStability.Evolving
public class CompactionRequest { public class CompactionRequest {
// was this compaction promoted to an off-peak // was this compaction promoted to an off-peak
@ -53,58 +49,20 @@ public class CompactionRequest {
private String regionName = ""; private String regionName = "";
private String storeName = ""; private String storeName = "";
private long totalSize = -1L; private long totalSize = -1L;
private CompactionLifeCycleTracker tracker = CompactionLifeCycleTracker.DUMMY;
/**
* This ctor should be used by coprocessors that want to subclass CompactionRequest.
*/
public CompactionRequest() {
this.selectionTime = EnvironmentEdgeManager.currentTime();
this.timeInNanos = System.nanoTime();
}
public CompactionRequest(Collection<StoreFile> files) { public CompactionRequest(Collection<StoreFile> files) {
this(); this.selectionTime = EnvironmentEdgeManager.currentTime();
Preconditions.checkNotNull(files); this.timeInNanos = System.nanoTime();
this.filesToCompact = files; this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize(); recalculateSize();
} }
public void updateFiles(Collection<StoreFile> files) { public void updateFiles(Collection<StoreFile> files) {
this.filesToCompact = files; this.filesToCompact = Preconditions.checkNotNull(files, "files for compaction can not null");
recalculateSize(); recalculateSize();
} }
/**
* Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
*/
public void beforeExecute() {}
/**
* Called after compaction is executed by CompactSplitThread; for use by coproc subclasses.
*/
public void afterExecute() {}
/**
* Combines the request with other request. Coprocessors subclassing CR may override
* this if they want to do clever things based on CompactionPolicy selection that
* is passed to this method via "other". The default implementation just does a copy.
* @param other Request to combine with.
* @return The result (may be "this" or "other").
*/
public CompactionRequest combineWith(CompactionRequest other) {
this.filesToCompact = new ArrayList<>(other.getFiles());
this.isOffPeak = other.isOffPeak;
this.isMajor = other.isMajor;
this.priority = other.priority;
this.selectionTime = other.selectionTime;
this.timeInNanos = other.timeInNanos;
this.regionName = other.regionName;
this.storeName = other.storeName;
this.totalSize = other.totalSize;
recalculateSize();
return this;
}
public Collection<StoreFile> getFiles() { public Collection<StoreFile> getFiles() {
return this.filesToCompact; return this.filesToCompact;
} }
@ -168,6 +126,14 @@ public class CompactionRequest {
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES); : (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
} }
public void setTracker(CompactionLifeCycleTracker tracker) {
this.tracker = tracker;
}
public CompactionLifeCycleTracker getTracker() {
return tracker;
}
@Override @Override
public String toString() { public String toString() {
String fsList = filesToCompact.stream().filter(f -> f.getReader() != null) String fsList = filesToCompact.stream().filter(f -> f.getReader() != null)
@ -186,12 +152,7 @@ public class CompactionRequest {
* @param files files that should be included in the compaction * @param files files that should be included in the compaction
*/ */
private void recalculateSize() { private void recalculateSize() {
long sz = 0; this.totalSize = filesToCompact.stream().map(StoreFile::getReader)
for (StoreFile sf : this.filesToCompact) { .mapToLong(r -> r != null ? r.length() : 0L).sum();
StoreFileReader r = sf.getReader();
sz += r == null ? 0 : r.length();
}
this.totalSize = sz;
} }
} }

View File

@ -338,14 +338,14 @@ public abstract class Compactor<T extends CellSink> {
* @param readPoint the read point to help create scanner by Coprocessor if required. * @param readPoint the read point to help create scanner by Coprocessor if required.
* @return Scanner override by coprocessor; null if not overriding. * @return Scanner override by coprocessor; null if not overriding.
*/ */
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, protected InternalScanner preCreateCoprocScanner(CompactionRequest request, ScanType scanType,
final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners, long earliestPutTs, List<StoreFileScanner> scanners, User user, long readPoint)
User user, final long readPoint) throws IOException { throws IOException {
if (store.getCoprocessorHost() == null) { if (store.getCoprocessorHost() == null) {
return null; return null;
} }
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
earliestPutTs, request, user, readPoint); earliestPutTs, request.getTracker(), user, readPoint);
} }
/** /**
@ -355,12 +355,13 @@ public abstract class Compactor<T extends CellSink> {
* @param scanner The default scanner created for compaction. * @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed. * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/ */
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request, protected InternalScanner postCreateCoprocScanner(CompactionRequest request, ScanType scanType,
final ScanType scanType, final InternalScanner scanner, User user) throws IOException { InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) { if (store.getCoprocessorHost() == null) {
return scanner; return scanner;
} }
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request, user); return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
user);
} }
/** /**

View File

@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@ -1532,7 +1533,8 @@ public class AccessController implements MasterObserver, RegionObserver, RegionS
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException { InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
throws IOException {
requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null, requirePermission(getActiveUser(c), "compact", getTableName(c.getEnvironment()), null, null,
Action.ADMIN, Action.CREATE); Action.ADMIN, Action.CREATE);
return scanner; return scanner;

View File

@ -92,7 +92,7 @@
</div> </div>
<% if(region != null) { // <% if(region != null) { //
List<Store> stores = region.getStores(); List<? extends Store> stores = region.getStores();
for (Store store : stores) { for (Store store : stores) {
String cf = store.getColumnFamilyName(); String cf = store.getColumnFamilyName();
Collection<StoreFile> storeFiles = store.getStorefiles(); %> Collection<StoreFile> storeFiles = store.getStorefiles(); %>

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import com.google.protobuf.Service;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
@ -36,11 +38,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.Leases;
@ -51,14 +50,14 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.protobuf.Service;
/** /**
* Basic mock region server services. Should only be instantiated by HBaseTestingUtility.b * Basic mock region server services. Should only be instantiated by HBaseTestingUtility.b
*/ */
@ -159,11 +158,6 @@ public class MockRegionServerServices implements RegionServerServices {
return null; return null;
} }
@Override
public CompactionRequestor getCompactionRequester() {
return null;
}
@Override @Override
public ClusterConnection getConnection() { public ClusterConnection getConnection() {
return null; return null;

View File

@ -124,7 +124,7 @@ public class TestIOFencing {
} }
@Override @Override
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
try { try {
return super.compact(compaction, store, throughputController); return super.compact(compaction, store, throughputController);
@ -134,7 +134,7 @@ public class TestIOFencing {
} }
@Override @Override
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
try { try {
return super.compact(compaction, store, throughputController, user); return super.compact(compaction, store, throughputController, user);

View File

@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -256,7 +256,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionRequest request, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker request, long readPoint) throws IOException {
return createCompactorScanner(store, scanners, scanType, earliestPutTs); return createCompactorScanner(store, scanners, scanType, earliestPutTs);
} }

View File

@ -784,7 +784,7 @@ public class TestBlockEvictionFromClient {
} }
private BlockCache setCacheProperties(Region region) { private BlockCache setCacheProperties(Region region) {
Iterator<Store> strItr = region.getStores().iterator(); Iterator<? extends Store> strItr = region.getStores().iterator();
BlockCache cache = null; BlockCache cache = null;
while (strItr.hasNext()) { while (strItr.hasNext()) {
Store store = strItr.next(); Store store = strItr.next();

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -203,20 +204,20 @@ public class SimpleRegionObserver implements RegionObserver {
@Override @Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<StoreFile> candidates, CompactionRequest request) throws IOException { List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {
ctPreCompactSelect.incrementAndGet(); ctPreCompactSelect.incrementAndGet();
} }
@Override @Override
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ImmutableList<StoreFile> selected, CompactionRequest request) { ImmutableList<StoreFile> selected, CompactionLifeCycleTracker tracker) {
ctPostCompactSelect.incrementAndGet(); ctPostCompactSelect.incrementAndGet();
} }
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException { InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
throws IOException {
ctPreCompact.incrementAndGet(); ctPreCompact.incrementAndGet();
return scanner; return scanner;
} }
@ -224,14 +225,14 @@ public class SimpleRegionObserver implements RegionObserver {
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionRequest request, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
ctPreCompactScanner.incrementAndGet(); ctPreCompactScanner.incrementAndGet();
return s; return s;
} }
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, CompactionRequest request) throws IOException { StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {
ctPostCompact.incrementAndGet(); ctPostCompact.incrementAndGet();
} }

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -194,13 +195,13 @@ public class TestCoprocessorInterface {
} }
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, InternalScanner scanner, ScanType scanType, CompactionRequest request) { Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) {
preCompactCalled = true; preCompactCalled = true;
return scanner; return scanner;
} }
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile, CompactionRequest request) { Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker) {
postCompactCalled = true; postCompactCalled = true;
} }
@Override @Override

View File

@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
@ -417,7 +417,7 @@ public class TestRegionObserverInterface {
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
final InternalScanner scanner, final ScanType scanType, CompactionRequest request) { InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker) {
return new InternalScanner() { return new InternalScanner() {
@Override @Override
public boolean next(List<Cell> results) throws IOException { public boolean next(List<Cell> results) throws IOException {
@ -456,7 +456,7 @@ public class TestRegionObserverInterface {
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile, CompactionRequest request) { StoreFile resultFile, CompactionLifeCycleTracker tracker) {
lastCompaction = EnvironmentEdgeManager.currentTime(); lastCompaction = EnvironmentEdgeManager.currentTime();
} }

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -155,7 +157,7 @@ public class TestRegionObserverScannerOpenHook {
@Override @Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionRequest request, long readPoint) long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint)
throws IOException { throws IOException {
scanners.forEach(KeyValueScanner::close); scanners.forEach(KeyValueScanner::close);
return NO_DATA; return NO_DATA;
@ -252,7 +254,7 @@ public class TestRegionObserverScannerOpenHook {
} }
@Override @Override
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
boolean ret = super.compact(compaction, store, throughputController); boolean ret = super.compact(compaction, store, throughputController);
if (ret) compactionStateChangeLatch.countDown(); if (ret) compactionStateChangeLatch.countDown();
@ -260,7 +262,7 @@ public class TestRegionObserverScannerOpenHook {
} }
@Override @Override
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
boolean ret = super.compact(compaction, store, throughputController, user); boolean ret = super.compact(compaction, store, throughputController, user);
if (ret) compactionStateChangeLatch.countDown(); if (ret) compactionStateChangeLatch.countDown();

View File

@ -47,6 +47,27 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
@ -61,10 +82,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
@ -102,27 +123,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRespon
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/** /**
* A mock RegionServer implementation. * A mock RegionServer implementation.
@ -314,12 +314,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return false; return false;
} }
@Override
public CompactionRequestor getCompactionRequester() {
// TODO Auto-generated method stub
return null;
}
@Override @Override
public FlushRequester getFlushRequester() { public FlushRequester getFlushRequester() {
// TODO Auto-generated method stub // TODO Auto-generated method stub

View File

@ -86,7 +86,7 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -716,14 +716,14 @@ public class TestMobCompactor {
} }
/** /**
* This copro overwrites the default compaction policy. It always chooses two latest * This copro overwrites the default compaction policy. It always chooses two latest hfiles and
* hfiles and compacts them into a new one. * compacts them into a new one.
*/ */
public static class CompactTwoLatestHfilesCopro implements RegionObserver { public static class CompactTwoLatestHfilesCopro implements RegionObserver {
@Override @Override
public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
final Store store, final List<StoreFile> candidates, final CompactionRequest request) List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException {
throws IOException {
int count = candidates.size(); int count = candidates.size();
if (count >= 2) { if (count >= 2) {

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -461,7 +462,7 @@ public class TestNamespaceAuditor {
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile, CompactionRequest request) throws IOException { StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {
postCompact.countDown(); postCompact.countDown();
} }

View File

@ -36,9 +36,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -324,7 +324,7 @@ public class TestFileSystemUtilizationChore {
final HRegionInfo info = mock(HRegionInfo.class); final HRegionInfo info = mock(HRegionInfo.class);
when(r.getRegionInfo()).thenReturn(info); when(r.getRegionInfo()).thenReturn(info);
List<Store> stores = new ArrayList<>(); List<Store> stores = new ArrayList<>();
when(r.getStores()).thenReturn(stores); when(r.getStores()).thenReturn((List) stores);
for (Long storeSize : storeSizes) { for (Long storeSize : storeSizes) {
final Store s = mock(Store.class); final Store s = mock(Store.class);
stores.add(s); stores.add(s);
@ -338,7 +338,7 @@ public class TestFileSystemUtilizationChore {
final HRegionInfo info = mock(HRegionInfo.class); final HRegionInfo info = mock(HRegionInfo.class);
when(r.getRegionInfo()).thenReturn(info); when(r.getRegionInfo()).thenReturn(info);
List<Store> stores = new ArrayList<>(); List<Store> stores = new ArrayList<>();
when(r.getStores()).thenReturn(stores); when(r.getStores()).thenReturn((List) stores);
assertEquals( assertEquals(
"Logic error, storeSizes and linkSizes must be equal in size", storeSizes.size(), "Logic error, storeSizes and linkSizes must be equal in size", storeSizes.size(),
hfileSizes.size()); hfileSizes.size());

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
/** /**
@ -61,7 +62,7 @@ public class NoOpScanPolicyObserver implements RegionObserver {
public InternalScanner preCompactScannerOpen( public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionRequest request, long readPoint) throws IOException { InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
// this demonstrates how to override the scanners default behavior // this demonstrates how to override the scanners default behavior
ScanInfo oldSI = store.getScanInfo(); ScanInfo oldSI = store.getScanInfo();
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(),

View File

@ -18,11 +18,16 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.mockito.Matchers.*; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*; import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Optional;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -33,15 +38,23 @@ import org.mockito.stubbing.Answer;
*/ */
public class StatefulStoreMockMaker { public class StatefulStoreMockMaker {
// Add and expand the methods and answers as needed. // Add and expand the methods and answers as needed.
public CompactionContext selectCompaction() { return null; } public Optional<CompactionContext> selectCompaction() {
public void cancelCompaction(Object originalContext) {} return Optional.empty();
public int getPriority() { return 0; } }
private class SelectAnswer implements Answer<CompactionContext> { public void cancelCompaction(Object originalContext) {
public CompactionContext answer(InvocationOnMock invocation) throws Throwable { }
public int getPriority() {
return 0;
}
private class SelectAnswer implements Answer<Optional<CompactionContext>> {
public Optional<CompactionContext> answer(InvocationOnMock invocation) throws Throwable {
return selectCompaction(); return selectCompaction();
} }
} }
private class PriorityAnswer implements Answer<Integer> { private class PriorityAnswer implements Answer<Integer> {
public Integer answer(InvocationOnMock invocation) throws Throwable { public Integer answer(InvocationOnMock invocation) throws Throwable {
return getPriority(); return getPriority();
@ -53,15 +66,13 @@ public class StatefulStoreMockMaker {
} }
} }
public Store createStoreMock(String name) throws Exception { public HStore createStoreMock(String name) throws Exception {
Store store = mock(Store.class, name); HStore store = mock(HStore.class, name);
when(store.requestCompaction( when(store.requestCompaction(anyInt(), any(CompactionLifeCycleTracker.class), any(User.class)))
anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer()); .then(new SelectAnswer());
when(store.requestCompaction(
anyInt(), isNull(CompactionRequest.class), any(User.class))).then(new SelectAnswer());
when(store.getCompactPriority()).then(new PriorityAnswer()); when(store.getCompactPriority()).then(new PriorityAnswer());
doAnswer(new CancelAnswer()).when( doAnswer(new CancelAnswer()).when(store)
store).cancelRequestedCompaction(any(CompactionContext.class)); .cancelRequestedCompaction(any(CompactionContext.class));
return store; return store;
} }
} }

View File

@ -33,8 +33,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After; import org.junit.After;
@ -298,15 +298,16 @@ public class TestCompaction {
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
// setup a region/store with some files // setup a region/store with some files
Store store = r.getStore(COLUMN_FAMILY); HStore store = r.getStore(COLUMN_FAMILY);
createStoreFile(r); createStoreFile(r);
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
createStoreFile(r); createStoreFile(r);
} }
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
TrackableCompactionRequest request = new TrackableCompactionRequest(latch); Tracker tracker = new Tracker(latch);
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request,null); thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, tracker,
null);
// wait for the latch to complete. // wait for the latch to complete.
latch.await(); latch.await();
@ -322,7 +323,7 @@ public class TestCompaction {
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
// setup a region/store with some files // setup a region/store with some files
Store store = r.getStore(COLUMN_FAMILY); HStore store = r.getStore(COLUMN_FAMILY);
createStoreFile(r); createStoreFile(r);
for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
createStoreFile(r); createStoreFile(r);
@ -337,9 +338,9 @@ public class TestCompaction {
long preFailedCount = metricsWrapper.getNumCompactionsFailed(); long preFailedCount = metricsWrapper.getNumCompactionsFailed();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
TrackableCompactionRequest request = new TrackableCompactionRequest(latch); Tracker tracker = new Tracker(latch);
thread.requestCompaction(mockRegion, store, "test custom comapction", Store.PRIORITY_USER, thread.requestCompaction(mockRegion, store, "test custom comapction", Store.PRIORITY_USER,
request, null); tracker, null);
// wait for the latch to complete. // wait for the latch to complete.
latch.await(120, TimeUnit.SECONDS); latch.await(120, TimeUnit.SECONDS);
@ -370,20 +371,17 @@ public class TestCompaction {
// setup a region/store with some files // setup a region/store with some files
int numStores = r.getStores().size(); int numStores = r.getStores().size();
List<Pair<CompactionRequest, Store>> requests = new ArrayList<>(numStores);
CountDownLatch latch = new CountDownLatch(numStores); CountDownLatch latch = new CountDownLatch(numStores);
Tracker tracker = new Tracker(latch);
// create some store files and setup requests for each store on which we want to do a // create some store files and setup requests for each store on which we want to do a
// compaction // compaction
for (Store store : r.getStores()) { for (HStore store : r.getStores()) {
createStoreFile(r, store.getColumnFamilyName()); createStoreFile(r, store.getColumnFamilyName());
createStoreFile(r, store.getColumnFamilyName()); createStoreFile(r, store.getColumnFamilyName());
createStoreFile(r, store.getColumnFamilyName()); createStoreFile(r, store.getColumnFamilyName());
requests.add(new Pair<>(new TrackableCompactionRequest(latch), store)); thread.requestCompaction(r, store, "test mulitple custom comapctions", Store.PRIORITY_USER,
tracker, null);
} }
thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
Collections.unmodifiableList(requests), null);
// wait for the latch to complete. // wait for the latch to complete.
latch.await(); latch.await();
@ -428,7 +426,7 @@ public class TestCompaction {
} }
@Override @Override
public synchronized CompactionContext selectCompaction() { public synchronized Optional<CompactionContext> selectCompaction() {
CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting)); CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
compacting.addAll(notCompacting); compacting.addAll(notCompacting);
notCompacting.clear(); notCompacting.clear();
@ -437,7 +435,7 @@ public class TestCompaction {
} catch (IOException ex) { } catch (IOException ex) {
fail("Shouldn't happen"); fail("Shouldn't happen");
} }
return ctx; return Optional.of(ctx);
} }
@Override @Override
@ -499,14 +497,14 @@ public class TestCompaction {
} }
@Override @Override
public CompactionContext selectCompaction() { public Optional<CompactionContext> selectCompaction() {
this.blocked = new BlockingCompactionContext(); this.blocked = new BlockingCompactionContext();
try { try {
this.blocked.select(null, false, false, false); this.blocked.select(null, false, false, false);
} catch (IOException ex) { } catch (IOException ex) {
fail("Shouldn't happen"); fail("Shouldn't happen");
} }
return this.blocked; return Optional.of(blocked);
} }
@Override @Override
@ -527,13 +525,13 @@ public class TestCompaction {
} }
@Override @Override
public Store createStoreMock(String name) throws Exception { public HStore createStoreMock(String name) throws Exception {
return createStoreMock(Integer.MIN_VALUE, name); return createStoreMock(Integer.MIN_VALUE, name);
} }
public Store createStoreMock(int priority, String name) throws Exception { public HStore createStoreMock(int priority, String name) throws Exception {
// Override the mock to always return the specified priority. // Override the mock to always return the specified priority.
Store s = super.createStoreMock(name); HStore s = super.createStoreMock(name);
when(s.getCompactPriority()).thenReturn(priority); when(s.getCompactPriority()).thenReturn(priority);
return s; return s;
} }
@ -555,7 +553,7 @@ public class TestCompaction {
// Set up the region mock that redirects compactions. // Set up the region mock that redirects compactions.
HRegion r = mock(HRegion.class); HRegion r = mock(HRegion.class);
when( when(
r.compact(any(CompactionContext.class), any(Store.class), r.compact(any(CompactionContext.class), any(HStore.class),
any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() { any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
@Override @Override
public Boolean answer(InvocationOnMock invocation) throws Throwable { public Boolean answer(InvocationOnMock invocation) throws Throwable {
@ -568,7 +566,7 @@ public class TestCompaction {
// Set up store mocks for 2 "real" stores and the one we use for blocking CST. // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
ArrayList<Integer> results = new ArrayList<>(); ArrayList<Integer> results = new ArrayList<>();
StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results); StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2"); HStore store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
BlockingStoreMockMaker blocker = new BlockingStoreMockMaker(); BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
// First, block the compaction thread so that we could muck with queue. // First, block the compaction thread so that we could muck with queue.
@ -691,24 +689,20 @@ public class TestCompaction {
} }
/** /**
* Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes. * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
* finishes.
*/ */
public static class TrackableCompactionRequest extends CompactionRequest { public static class Tracker implements CompactionLifeCycleTracker {
private CountDownLatch done;
/** private final CountDownLatch done;
* Constructor for a custom compaction. Uses the setXXX methods to update the state of the
* compaction before being used. public Tracker(CountDownLatch done) {
*/ this.done = done;
public TrackableCompactionRequest(CountDownLatch finished) {
super();
this.done = finished;
} }
@Override @Override
public void afterExecute() { public void afterExecute(Store store) {
super.afterExecute(); done.countDown();
this.done.countDown();
} }
} }
} }

View File

@ -28,6 +28,7 @@ import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import javax.crypto.spec.SecretKeySpec; import javax.crypto.spec.SecretKeySpec;
@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -536,8 +538,9 @@ public class TestHMobStore {
// Trigger major compaction // Trigger major compaction
this.store.triggerMajorCompaction(); this.store.triggerMajorCompaction();
CompactionContext requestCompaction = this.store.requestCompaction(1, null); Optional<CompactionContext> requestCompaction =
this.store.compact(requestCompaction, NoLimitThroughputController.INSTANCE, null); this.store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null);
this.store.compact(requestCompaction.get(), NoLimitThroughputController.INSTANCE, null);
Assert.assertEquals(1, this.store.getStorefiles().size()); Assert.assertEquals(1, this.store.getStorefiles().size());
//Check encryption after compaction //Check encryption after compaction

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -254,7 +255,8 @@ public class TestHRegionServerBulkLoad {
static int sleepDuration; static int sleepDuration;
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException { InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
throws IOException {
try { try {
Thread.sleep(sleepDuration); Thread.sleep(sleepDuration);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
@ -417,7 +418,7 @@ public class TestMajorCompaction {
} }
store.triggerMajorCompaction(); store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest(); CompactionRequest request = store.requestCompaction().get().getRequest();
assertNotNull("Expected to receive a compaction request", request); assertNotNull("Expected to receive a compaction request", request);
assertEquals( assertEquals(
"System-requested major compaction should not occur if there are too many store files", "System-requested major compaction should not occur if there are too many store files",
@ -436,7 +437,9 @@ public class TestMajorCompaction {
createStoreFile(r); createStoreFile(r);
} }
store.triggerMajorCompaction(); store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest(); CompactionRequest request =
store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, null).get()
.getRequest();
assertNotNull("Expected to receive a compaction request", request); assertNotNull("Expected to receive a compaction request", request);
assertEquals( assertEquals(
"User-requested major compaction should always occur, even if there are too many store files", "User-requested major compaction should always occur, even if there are too many store files",

View File

@ -31,6 +31,7 @@ import java.io.InterruptedIOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -70,7 +71,11 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.*; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.NoSuchProcedureException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.assignment.RegionStates;
@ -234,7 +239,7 @@ public class TestSplitTransactionOnCluster {
assertEquals(1, cluster.getRegions(tableName).size()); assertEquals(1, cluster.getRegions(tableName).size());
HRegion region = cluster.getRegions(tableName).get(0); HRegion region = cluster.getRegions(tableName).get(0);
Store store = region.getStore(cf); HStore store = region.getStore(cf);
int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName()); int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
@ -246,8 +251,8 @@ public class TestSplitTransactionOnCluster {
int fileNum = store.getStorefiles().size(); int fileNum = store.getStorefiles().size();
// 0, Compaction Request // 0, Compaction Request
store.triggerMajorCompaction(); store.triggerMajorCompaction();
CompactionContext cc = store.requestCompaction(); Optional<CompactionContext> cc = store.requestCompaction();
assertNotNull(cc); assertTrue(cc.isPresent());
// 1, A timeout split // 1, A timeout split
// 1.1 close region // 1.1 close region
assertEquals(2, region.close(false).get(cf).size()); assertEquals(2, region.close(false).get(cf).size());
@ -255,7 +260,7 @@ public class TestSplitTransactionOnCluster {
region.initialize(); region.initialize();
// 2, Run Compaction cc // 2, Run Compaction cc
assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE)); assertFalse(region.compact(cc.get(), store, NoLimitThroughputController.INSTANCE));
assertTrue(fileNum > store.getStorefiles().size()); assertTrue(fileNum > store.getStorefiles().size());
// 3, Split // 3, Split

View File

@ -117,7 +117,7 @@ public class TestSplitWalDataLoss {
} }
}).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(), }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(), Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
Matchers.<Collection<Store>> any()); Matchers.<Collection<HStore>> any());
// Find region key; don't pick up key for hbase:meta by mistake. // Find region key; don't pick up key for hbase:meta by mistake.
String key = null; String key = null;
for (Map.Entry<String, Region> entry: rs.onlineRegions.entrySet()) { for (Map.Entry<String, Region> entry: rs.onlineRegions.entrySet()) {

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -47,7 +46,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -74,6 +72,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -105,12 +106,6 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** /**
* Test class for the Store * Test class for the Store
*/ */
@ -365,7 +360,7 @@ public class TestStore {
// There will be no compaction due to threshold above. Last file will not be replaced. // There will be no compaction due to threshold above. Last file will not be replaced.
for (int i = 1; i <= storeFileNum - 1; i++) { for (int i = 1; i <= storeFileNum - 1; i++) {
// verify the expired store file. // verify the expired store file.
assertNull(this.store.requestCompaction()); assertFalse(this.store.requestCompaction().isPresent());
Collection<StoreFile> sfs = this.store.getStorefiles(); Collection<StoreFile> sfs = this.store.getStorefiles();
// Ensure i files are gone. // Ensure i files are gone.
if (minVersions == 0) { if (minVersions == 0) {
@ -380,7 +375,7 @@ public class TestStore {
// Let the next store file expired. // Let the next store file expired.
edge.incrementTime(sleepTime); edge.incrementTime(sleepTime);
} }
assertNull(this.store.requestCompaction()); assertFalse(this.store.requestCompaction().isPresent());
Collection<StoreFile> sfs = this.store.getStorefiles(); Collection<StoreFile> sfs = this.store.getStorefiles();
// Assert the last expired file is not removed. // Assert the last expired file is not removed.
@ -416,7 +411,7 @@ public class TestStore {
Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
// after compact; check the lowest time stamp // after compact; check the lowest time stamp
store.compact(store.requestCompaction(), NoLimitThroughputController.INSTANCE, null); store.compact(store.requestCompaction().get(), NoLimitThroughputController.INSTANCE, null);
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);

View File

@ -823,12 +823,12 @@ public abstract class AbstractTestWALReplay {
final HRegion region = final HRegion region =
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
@Override @Override
protected FlushResult internalFlushcache(final WAL wal, final long myseqid, protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status, final Collection<HStore> storesToFlush, MonitoredTask status,
boolean writeFlushWalMarker) boolean writeFlushWalMarker)
throws IOException { throws IOException {
LOG.info("InternalFlushCache Invoked"); LOG.info("InternalFlushCache Invoked");
FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush, FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
Mockito.mock(MonitoredTask.class), writeFlushWalMarker); Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
flushcount.incrementAndGet(); flushcount.incrementAndGet();
return fs; return fs;

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -263,7 +264,7 @@ public class TestCoprocessorScanPolicy {
public InternalScanner preCompactScannerOpen( public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionRequest request, long readPoint) throws IOException { InternalScanner s,CompactionLifeCycleTracker tracker, long readPoint) throws IOException {
Long newTtl = ttls.get(store.getTableName()); Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName()); Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo(); ScanInfo oldSI = store.getScanInfo();