HBASE-14969 Add throughput controller for flush

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Yu Li 2016-01-29 09:30:20 +08:00 committed by zhangduo
parent 14dd959aa2
commit b3b1ce99c6
44 changed files with 994 additions and 326 deletions

View File

@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
@ -151,7 +151,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
@Override @Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, long smallestReadPoint, boolean cleanSeqId,
CompactionThroughputController throughputController, boolean major) throws IOException { ThroughputController throughputController, boolean major) throws IOException {
if (!(scanner instanceof MobCompactionStoreScanner)) { if (!(scanner instanceof MobCompactionStoreScanner)) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"The scanner should be an instance of MobCompactionStoreScanner"); "The scanner should be an instance of MobCompactionStoreScanner");

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -96,7 +97,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
*/ */
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status) throws IOException { MonitoredTask status, ThroughputController throughputController) throws IOException {
ArrayList<Path> result = new ArrayList<Path>(); ArrayList<Path> result = new ArrayList<Path>();
int cellsCount = snapshot.getCellsCount(); int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) return result; // don't flush if there are no entries

View File

@ -40,8 +40,8 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -89,7 +89,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
private final ThreadPoolExecutor splits; private final ThreadPoolExecutor splits;
private final ThreadPoolExecutor mergePool; private final ThreadPoolExecutor mergePool;
private volatile CompactionThroughputController compactionThroughputController; private volatile ThroughputController compactionThroughputController;
/** /**
* Splitting should not take place if the total number of regions exceed this. * Splitting should not take place if the total number of regions exceed this.
@ -672,7 +672,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
} }
} }
CompactionThroughputController old = this.compactionThroughputController; ThroughputController old = this.compactionThroughputController;
if (old != null) { if (old != null) {
old.stop("configuration change"); old.stop("configuration change");
} }
@ -717,7 +717,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
} }
@VisibleForTesting @VisibleForTesting
public CompactionThroughputController getCompactionThroughputController() { public ThroughputController getCompactionThroughputController() {
return compactionThroughputController; return compactionThroughputController;
} }

View File

@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.mapreduce.JobUtil; import org.apache.hadoop.hbase.mapreduce.JobUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -164,7 +164,7 @@ public class CompactionTool extends Configured implements Tool {
CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null); CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
if (compaction == null) break; if (compaction == null) break;
List<StoreFile> storeFiles = List<StoreFile> storeFiles =
store.compact(compaction, NoLimitCompactionThroughputController.INSTANCE); store.compact(compaction, NoLimitThroughputController.INSTANCE);
if (storeFiles != null && !storeFiles.isEmpty()) { if (storeFiles != null && !storeFiles.isEmpty()) {
if (keepCompactedFiles && deleteCompacted) { if (keepCompactedFiles && deleteCompacted) {
for (StoreFile storeFile: storeFiles) { for (StoreFile storeFile: storeFiles) {

View File

@ -21,16 +21,16 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -120,13 +120,13 @@ public class DefaultStoreEngine extends StoreEngine<
} }
@Override @Override
public List<Path> compact(CompactionThroughputController throughputController) public List<Path> compact(ThroughputController throughputController)
throws IOException { throws IOException {
return compact(throughputController, null); return compact(throughputController, null);
} }
@Override @Override
public List<Path> compact(CompactionThroughputController throughputController, User user) public List<Path> compact(ThroughputController throughputController, User user)
throws IOException { throws IOException {
return compactor.compact(request, throughputController, user); return compactor.compact(request, throughputController, user);
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -44,7 +45,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status) throws IOException { MonitoredTask status, ThroughputController throughputController) throws IOException {
ArrayList<Path> result = new ArrayList<Path>(); ArrayList<Path> result = new ArrayList<Path>();
int cellsCount = snapshot.getCellsCount(); int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) return result; // don't flush if there are no entries
@ -71,7 +72,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
IOException e = null; IOException e = null;
try { try {
performFlush(scanner, writer, smallestReadPoint); performFlush(scanner, writer, smallestReadPoint, throughputController);
} catch (IOException ioe) { } catch (IOException ioe) {
e = ioe; e = ioe;
// throw the exception out // throw the exception out

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobStoreEngine; import org.apache.hadoop.hbase.mob.MobStoreEngine;
import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HFileArchiveUtil;
@ -458,7 +458,7 @@ public class HMobStore extends HStore {
*/ */
@Override @Override
public List<StoreFile> compact(CompactionContext compaction, public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
// If it's major compaction, try to find whether there's a sweeper is running // If it's major compaction, try to find whether there's a sweeper is running
// If yes, mark the major compaction as retainDeleteMarkers // If yes, mark the major compaction as retainDeleteMarkers
if (compaction.getRequest().isAllFiles()) { if (compaction.getRequest().isAllFiles()) {

View File

@ -151,9 +151,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -1727,12 +1727,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Store s : getStores()) { for (Store s : getStores()) {
CompactionContext compaction = s.requestCompaction(); CompactionContext compaction = s.requestCompaction();
if (compaction != null) { if (compaction != null) {
CompactionThroughputController controller = null; ThroughputController controller = null;
if (rsServices != null) { if (rsServices != null) {
controller = CompactionThroughputControllerFactory.create(rsServices, conf); controller = CompactionThroughputControllerFactory.create(rsServices, conf);
} }
if (controller == null) { if (controller == null) {
controller = NoLimitCompactionThroughputController.INSTANCE; controller = NoLimitThroughputController.INSTANCE;
} }
compact(compaction, s, controller, null); compact(compaction, s, controller, null);
} }
@ -1749,7 +1749,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Store s : getStores()) { for (Store s : getStores()) {
CompactionContext compaction = s.requestCompaction(); CompactionContext compaction = s.requestCompaction();
if (compaction != null) { if (compaction != null) {
compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null); compact(compaction, s, NoLimitThroughputController.INSTANCE, null);
} }
} }
} }
@ -1761,7 +1761,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException e * @throws IOException e
*/ */
@VisibleForTesting @VisibleForTesting
void compactStore(byte[] family, CompactionThroughputController throughputController) void compactStore(byte[] family, ThroughputController throughputController)
throws IOException { throws IOException {
Store s = getStore(family); Store s = getStore(family);
CompactionContext compaction = s.requestCompaction(); CompactionContext compaction = s.requestCompaction();
@ -1786,12 +1786,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return whether the compaction completed * @return whether the compaction completed
*/ */
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
return compact(compaction, store, throughputController, null); return compact(compaction, store, throughputController, null);
} }
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
assert compaction != null && compaction.hasSelection(); assert compaction != null && compaction.hasSelection();
assert !compaction.getRequest().getFiles().isEmpty(); assert !compaction.getRequest().getFiles().isEmpty();
if (this.closing.get() || this.closed.get()) { if (this.closing.get() || this.closed.get()) {

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@ -139,6 +140,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@ -189,6 +192,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.Service; import com.google.protobuf.Service;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import sun.misc.Signal; import sun.misc.Signal;
import sun.misc.SignalHandler; import sun.misc.SignalHandler;
@ -198,7 +202,8 @@ import sun.misc.SignalHandler;
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class HRegionServer extends HasThread implements RegionServerServices, LastSequenceId { public class HRegionServer extends HasThread implements
RegionServerServices, LastSequenceId, ConfigurationObserver {
private static final Log LOG = LogFactory.getLog(HRegionServer.class); private static final Log LOG = LogFactory.getLog(HRegionServer.class);
@ -487,6 +492,8 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
private CompactedHFilesDischarger compactedFileDischarger; private CompactedHFilesDischarger compactedFileDischarger;
private volatile ThroughputController flushThroughputController;
/** /**
* Starts a HRegionServer at the default location. * Starts a HRegionServer at the default location.
* @param conf * @param conf
@ -609,6 +616,7 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
putUpWebUI(); putUpWebUI();
this.walRoller = new LogRoller(this, this); this.walRoller = new LogRoller(this, this);
this.choreService = new ChoreService(getServerName().toString(), true); this.choreService = new ChoreService(getServerName().toString(), true);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
if (!SystemUtils.IS_OS_WINDOWS) { if (!SystemUtils.IS_OS_WINDOWS) {
Signal.handle(new Signal("HUP"), new SignalHandler() { Signal.handle(new Signal("HUP"), new SignalHandler() {
@ -899,6 +907,7 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
// Registering the compactSplitThread object with the ConfigurationManager. // Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread); configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.rpcServices); configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
} }
/** /**
@ -3380,4 +3389,28 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
public boolean walRollRequestFinished() { public boolean walRollRequestFinished() {
return this.walRoller.walRollFinished(); return this.walRoller.walRollFinished();
} }
@Override
public ThroughputController getFlushThroughputController() {
return flushThroughputController;
}
@Override
public double getFlushPressure() {
if (getRegionServerAccounting() == null || cacheFlusher == null) {
// return 0 during RS initialization
return 0.0;
}
return getRegionServerAccounting().getGlobalMemstoreSize() * 1.0
/ cacheFlusher.globalMemStoreLimitLowMark;
}
@Override
public void onConfigurationChange(Configuration newConf) {
ThroughputController old = this.flushThroughputController;
if (old != null) {
old.stop("configuration change");
}
this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
}
} }

View File

@ -77,9 +77,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -845,7 +845,7 @@ public class HStore implements Store {
/** /**
* Snapshot this stores memstore. Call before running * Snapshot this stores memstore. Call before running
* {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)} * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)}
* so it has some work to do. * so it has some work to do.
*/ */
void snapshot() { void snapshot() {
@ -862,11 +862,12 @@ public class HStore implements Store {
* @param logCacheFlushId flush sequence number * @param logCacheFlushId flush sequence number
* @param snapshot * @param snapshot
* @param status * @param status
* @param throughputController
* @return The path name of the tmp file to which the store was flushed * @return The path name of the tmp file to which the store was flushed
* @throws IOException * @throws IOException if exception occurs during process
*/ */
protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
MonitoredTask status) throws IOException { MonitoredTask status, ThroughputController throughputController) throws IOException {
// If an exception happens flushing, we let it out without clearing // If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say // the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around. // 'snapshot', the next time flush comes around.
@ -876,7 +877,8 @@ public class HStore implements Store {
IOException lastException = null; IOException lastException = null;
for (int i = 0; i < flushRetriesNumber; i++) { for (int i = 0; i < flushRetriesNumber; i++) {
try { try {
List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status); List<Path> pathNames =
flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController);
Path lastPathName = null; Path lastPathName = null;
try { try {
for (Path pathName : pathNames) { for (Path pathName : pathNames) {
@ -1175,13 +1177,13 @@ public class HStore implements Store {
*/ */
@Override @Override
public List<StoreFile> compact(CompactionContext compaction, public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
return compact(compaction, throughputController, null); return compact(compaction, throughputController, null);
} }
@Override @Override
public List<StoreFile> compact(CompactionContext compaction, public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
assert compaction != null; assert compaction != null;
List<StoreFile> sfs = null; List<StoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest(); CompactionRequest cr = compaction.getRequest();
@ -2058,7 +2060,10 @@ public class HStore implements Store {
@Override @Override
public void flushCache(MonitoredTask status) throws IOException { public void flushCache(MonitoredTask status) throws IOException {
tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status); RegionServerServices rsService = region.getRegionServerServices();
ThroughputController throughputController =
rsService == null ? null : rsService.getFlushThroughputController();
tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController);
} }
@Override @Override

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -231,4 +232,16 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
* @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
*/ */
double getCompactionPressure(); double getCompactionPressure();
/**
* @return the controller to avoid flush too fast
*/
ThroughputController getFlushThroughputController();
/**
* @return the flush pressure of all stores on this regionserver. The value should be greater than
* or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that
* global memstore size already exceeds lower limit.
*/
double getFlushPressure();
} }

View File

@ -22,8 +22,6 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.NavigableSet; import java.util.NavigableSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -32,6 +30,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
/** /**
@ -225,14 +225,14 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
void cancelRequestedCompaction(CompactionContext compaction); void cancelRequestedCompaction(CompactionContext compaction);
/** /**
* @deprecated see compact(CompactionContext, CompactionThroughputController, User) * @deprecated see compact(CompactionContext, ThroughputController, User)
*/ */
@Deprecated @Deprecated
List<StoreFile> compact(CompactionContext compaction, List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException; ThroughputController throughputController) throws IOException;
List<StoreFile> compact(CompactionContext compaction, List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController, User user) throws IOException; ThroughputController throughputController, User user) throws IOException;
/** /**
* @return true if we should run a major compaction. * @return true if we should run a major compaction.

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -27,10 +28,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
/** /**
* Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@ -51,10 +55,11 @@ abstract class StoreFlusher {
* @param snapshot Memstore snapshot. * @param snapshot Memstore snapshot.
* @param cacheFlushSeqNum Log cache flush sequence number. * @param cacheFlushSeqNum Log cache flush sequence number.
* @param status Task that represents the flush operation and may be updated with status. * @param status Task that represents the flush operation and may be updated with status.
* @param throughputController A controller to avoid flush too fast
* @return List of files written. Can be empty; must not be null. * @return List of files written. Can be empty; must not be null.
*/ */
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status) throws IOException; MonitoredTask status, ThroughputController throughputController) throws IOException;
protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum, protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException { MonitoredTask status) throws IOException {
@ -104,9 +109,10 @@ abstract class StoreFlusher {
* @param scanner Scanner to get data from. * @param scanner Scanner to get data from.
* @param sink Sink to write data to. Could be StoreFile.Writer. * @param sink Sink to write data to. Could be StoreFile.Writer.
* @param smallestReadPoint Smallest read point used for the flush. * @param smallestReadPoint Smallest read point used for the flush.
* @param throughputController A controller to avoid flush too fast
*/ */
protected void performFlush(InternalScanner scanner, protected void performFlush(InternalScanner scanner, Compactor.CellSink sink,
Compactor.CellSink sink, long smallestReadPoint) throws IOException { long smallestReadPoint, ThroughputController throughputController) throws IOException {
int compactionKVMax = int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
@ -115,6 +121,13 @@ abstract class StoreFlusher {
List<Cell> kvs = new ArrayList<Cell>(); List<Cell> kvs = new ArrayList<Cell>();
boolean hasMore; boolean hasMore;
String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
// no control on system table (such as meta, namespace, etc) flush
boolean control = throughputController != null && !store.getRegionInfo().isSystemTable();
if (control) {
throughputController.start(flushName);
}
try {
do { do {
hasMore = scanner.next(kvs, scannerContext); hasMore = scanner.next(kvs, scannerContext);
if (!kvs.isEmpty()) { if (!kvs.isEmpty()) {
@ -123,9 +136,21 @@ abstract class StoreFlusher {
// set its memstoreTS to 0. This will help us save space when writing to // set its memstoreTS to 0. This will help us save space when writing to
// disk. // disk.
sink.append(c); sink.append(c);
int len = KeyValueUtil.length(c);
if (control) {
throughputController.control(flushName, len);
}
} }
kvs.clear(); kvs.clear();
} }
} while (hasMore); } while (hasMore);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted while control throughput of flushing "
+ flushName);
} finally {
if (control) {
throughputController.finish(flushName);
}
}
} }
} }

View File

@ -23,16 +23,16 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -100,14 +100,14 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
} }
@Override @Override
public List<Path> compact(CompactionThroughputController throughputController) public List<Path> compact(ThroughputController throughputController)
throws IOException { throws IOException {
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
return this.stripeRequest.execute(compactor, throughputController, null); return this.stripeRequest.execute(compactor, throughputController, null);
} }
@Override @Override
public List<Path> compact(CompactionThroughputController throughputController, User user) public List<Path> compact(ThroughputController throughputController, User user)
throws IOException { throws IOException {
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
return this.stripeRequest.execute(compactor, throughputController, user); return this.stripeRequest.execute(compactor, throughputController, user);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -56,7 +57,7 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status) throws IOException { MonitoredTask status, ThroughputController throughputController) throws IOException {
List<Path> result = new ArrayList<Path>(); List<Path> result = new ArrayList<Path>();
int cellsCount = snapshot.getCellsCount(); int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) return result; // don't flush if there are no entries
@ -80,7 +81,7 @@ public class StripeStoreFlusher extends StoreFlusher {
mw.init(storeScanner, factory, store.getComparator()); mw.init(storeScanner, factory, store.getComparator());
synchronized (flushLock) { synchronized (flushLock) {
performFlush(scanner, mw, smallestReadPoint); performFlush(scanner, mw, smallestReadPoint, throughputController);
result = mw.commitWriters(cacheFlushSeqNum, false); result = mw.commitWriters(cacheFlushSeqNum, false);
success = true; success = true;
} }

View File

@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -69,10 +70,10 @@ public abstract class CompactionContext {
* Runs the compaction based on current selection. select/forceSelect must have been called. * Runs the compaction based on current selection. select/forceSelect must have been called.
* @return The new file paths resulting from compaction. * @return The new file paths resulting from compaction.
*/ */
public abstract List<Path> compact(CompactionThroughputController throughputController) public abstract List<Path> compact(ThroughputController throughputController)
throws IOException; throws IOException;
public abstract List<Path> compact(CompactionThroughputController throughputController, User user) public abstract List<Path> compact(ThroughputController throughputController, User user)
throws IOException; throws IOException;
public CompactionRequest getRequest() { public CompactionRequest getRequest() {

View File

@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -48,10 +47,12 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
/** /**
@ -281,25 +282,6 @@ public abstract class Compactor {
} }
} }
/**
* Used to prevent compaction name conflict when multiple compactions running parallel on the
* same store.
*/
private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
private String generateCompactionName() {
int counter;
for (;;) {
counter = NAME_COUNTER.get();
int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
if (NAME_COUNTER.compareAndSet(counter, next)) {
break;
}
}
return store.getRegionInfo().getRegionNameAsString() + "#"
+ store.getFamily().getNameAsString() + "#" + counter;
}
/** /**
* Performs the compaction. * Performs the compaction.
* @param fd FileDetails of cell sink writer * @param fd FileDetails of cell sink writer
@ -312,7 +294,7 @@ public abstract class Compactor {
*/ */
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, long smallestReadPoint, boolean cleanSeqId,
CompactionThroughputController throughputController, boolean major) throws IOException { ThroughputController throughputController, boolean major) throws IOException {
long bytesWrittenProgressForCloseCheck = 0; long bytesWrittenProgressForCloseCheck = 0;
long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0; long bytesWrittenProgressForShippedCall = 0;
@ -324,7 +306,7 @@ public abstract class Compactor {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime(); lastMillis = EnvironmentEdgeManager.currentTime();
} }
String compactionName = generateCompactionName(); String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0; long now = 0;
boolean hasMore; boolean hasMore;
ScannerContext scannerContext = ScannerContext scannerContext =

View File

@ -34,11 +34,13 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
/** /**
* Compact passed set of files. Create an instance and then call * Compact passed set of files. Create an instance and then call
* {@link #compact(CompactionRequest, CompactionThroughputController, User)} * {@link #compact(CompactionRequest, ThroughputController, User)}
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class DefaultCompactor extends Compactor { public class DefaultCompactor extends Compactor {
@ -52,7 +54,7 @@ public class DefaultCompactor extends Compactor {
* Do a minor/major compaction on an explicit set of storefiles from a Store. * Do a minor/major compaction on an explicit set of storefiles from a Store.
*/ */
public List<Path> compact(final CompactionRequest request, public List<Path> compact(final CompactionRequest request,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount); this.progress = new CompactionProgress(fd.maxKeyCount);
@ -173,7 +175,7 @@ public class DefaultCompactor extends Compactor {
/** /**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest, CompactionThroughputController, User)}; * {@link #compact(CompactionRequest, ThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for * @param filesToCompact the files to compact. These are used as the compactionSelection for
* the generated {@link CompactionRequest}. * the generated {@link CompactionRequest}.
* @param isMajor true to major compact (prune all deletes, max versions, etc) * @param isMajor true to major compact (prune all deletes, max versions, etc)
@ -185,6 +187,6 @@ public class DefaultCompactor extends Compactor {
throws IOException { throws IOException {
CompactionRequest cr = new CompactionRequest(filesToCompact); CompactionRequest cr = new CompactionRequest(filesToCompact);
cr.setIsMajor(isMajor, isMajor); cr.setIsMajor(isMajor, isMajor);
return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null); return this.compact(cr, NoLimitThroughputController.INSTANCE, null);
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists; import org.apache.hadoop.hbase.util.ConcatenatedLists;
@ -392,7 +393,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
protected byte[] majorRangeFromRow = null, majorRangeToRow = null; protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
public List<Path> execute(StripeCompactor compactor, public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
return execute(compactor, throughputController, null); return execute(compactor, throughputController, null);
} }
/** /**
@ -402,7 +403,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @return result of compact(...) * @return result of compact(...)
*/ */
public abstract List<Path> execute(StripeCompactor compactor, public abstract List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController, User user) throws IOException; ThroughputController throughputController, User user) throws IOException;
public StripeCompactionRequest(CompactionRequest request) { public StripeCompactionRequest(CompactionRequest request) {
this.request = request; this.request = request;
@ -454,7 +455,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
@Override @Override
public List<Path> execute(StripeCompactor compactor, public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
this.majorRangeToRow, throughputController, user); this.majorRangeToRow, throughputController, user);
} }
@ -505,7 +506,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
@Override @Override
public List<Path> execute(StripeCompactor compactor, public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -53,14 +54,14 @@ public class StripeCompactor extends Compactor {
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries, public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow, return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
throughputController, null); throughputController, null);
} }
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries, public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@ -77,14 +78,14 @@ public class StripeCompactor extends Compactor {
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize, public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
return compact(request, targetCount, targetSize, left, right, majorRangeFromRow, return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
majorRangeToRow, throughputController, null); majorRangeToRow, throughputController, null);
} }
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize, public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + targetSize LOG.debug("Executing compaction with " + targetSize
+ " target file size, no more than " + targetCount + " files, in [" + " target file size, no more than " + targetCount + " files, in ["
@ -98,7 +99,7 @@ public class StripeCompactor extends Compactor {
private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles(); final Collection<StoreFile> filesToCompact = request.getFiles();
final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount); this.progress = new CompactionProgress(fd.maxKeyCount);

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public final class CompactionThroughputControllerFactory {
private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class);
public static final String HBASE_THROUGHPUT_CONTROLLER_KEY =
"hbase.regionserver.throughput.controller";
private CompactionThroughputControllerFactory() {
}
private static final Class<? extends ThroughputController>
DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class;
// for backward compatibility and may not be supported in the future
private static final String DEPRECATED_NAME_OF_PRESSURE_AWARE_THROUGHPUT_CONTROLLER_CLASS =
"org.apache.hadoop.hbase.regionserver.compactions."
+ "PressureAwareCompactionThroughputController";
private static final String DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS =
"org.apache.hadoop.hbase.regionserver.compactions."
+ "NoLimitThroughputController.java";
public static ThroughputController create(RegionServerServices server,
Configuration conf) {
Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf);
ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
controller.setup(server);
return controller;
}
public static Class<? extends ThroughputController> getThroughputControllerClass(
Configuration conf) {
String className =
conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName());
className = resolveDeprecatedClassName(className);
try {
return Class.forName(className).asSubclass(ThroughputController.class);
} catch (Exception e) {
LOG.warn(
"Unable to load configured throughput controller '" + className
+ "', load default throughput controller "
+ DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e);
return DEFAULT_THROUGHPUT_CONTROLLER_CLASS;
}
}
/**
* Resolve deprecated class name to keep backward compatibiliy
* @param oldName old name of the class
* @return the new name if there is any
*/
private static String resolveDeprecatedClassName(String oldName) {
String className = oldName;
if (className.equals(DEPRECATED_NAME_OF_PRESSURE_AWARE_THROUGHPUT_CONTROLLER_CLASS)) {
className = PressureAwareCompactionThroughputController.class.getName();
} else if (className.equals(DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS)) {
className = NoLimitThroughputController.class.getName();
}
if (!className.equals(oldName)) {
LOG.warn(oldName + " is deprecated, please use " + className + " instead");
}
return className;
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -26,36 +26,40 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class CompactionThroughputControllerFactory { public final class FlushThroughputControllerFactory {
private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class); private static final Log LOG = LogFactory.getLog(FlushThroughputControllerFactory.class);
public static final String HBASE_THROUGHPUT_CONTROLLER_KEY = public static final String HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY =
"hbase.regionserver.throughput.controller"; "hbase.regionserver.flush.throughput.controller";
private static final Class<? extends CompactionThroughputController> private static final Class<? extends ThroughputController>
DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class; DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS = NoLimitThroughputController.class;
public static CompactionThroughputController create(RegionServerServices server, private FlushThroughputControllerFactory() {
}
public static ThroughputController create(RegionServerServices server,
Configuration conf) { Configuration conf) {
Class<? extends CompactionThroughputController> clazz = getThroughputControllerClass(conf); Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf);
CompactionThroughputController controller = ReflectionUtils.newInstance(clazz, conf); ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
controller.setup(server); controller.setup(server);
return controller; return controller;
} }
public static Class<? extends CompactionThroughputController> getThroughputControllerClass( public static Class<? extends ThroughputController> getThroughputControllerClass(
Configuration conf) { Configuration conf) {
String className = String className =
conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName()); conf.get(HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName());
try { try {
return Class.forName(className).asSubclass(CompactionThroughputController.class); return Class.forName(className).asSubclass(ThroughputController.class);
} catch (Exception e) { } catch (Exception e) {
LOG.warn( LOG.warn(
"Unable to load configured throughput controller '" + className "Unable to load configured flush throughput controller '" + className
+ "', load default throughput controller " + "', load default throughput controller "
+ DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e); + DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e);
return DEFAULT_THROUGHPUT_CONTROLLER_CLASS; return DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS;
} }
} }
} }

View File

@ -15,20 +15,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* A dummy CompactionThroughputController that does nothing.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class NoLimitCompactionThroughputController implements CompactionThroughputController { public class NoLimitThroughputController implements ThroughputController {
public static final NoLimitCompactionThroughputController INSTANCE = public static final NoLimitThroughputController INSTANCE = new NoLimitThroughputController();
new NoLimitCompactionThroughputController();
@Override @Override
public void setup(RegionServerServices server) { public void setup(RegionServerServices server) {
@ -47,7 +43,7 @@ public class NoLimitCompactionThroughputController implements CompactionThroughp
public void finish(String compactionName) { public void finish(String compactionName) {
} }
private volatile boolean stopped; private boolean stopped;
@Override @Override
public void stop(String why) { public void stop(String why) {
@ -61,6 +57,6 @@ public class NoLimitCompactionThroughputController implements CompactionThroughp
@Override @Override
public String toString() { public String toString() {
return "NoLimitCompactionThroughputController"; return "NoLimitThroughputController";
} }
} }

View File

@ -15,21 +15,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.throttle;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
/** /**
* A throughput controller which uses the follow schema to limit throughput * A throughput controller which uses the follow schema to limit throughput
@ -37,7 +32,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* <li>If compaction pressure is greater than 1.0, no limitation.</li> * <li>If compaction pressure is greater than 1.0, no limitation.</li>
* <li>In off peak hours, use a fixed throughput limitation * <li>In off peak hours, use a fixed throughput limitation
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li> * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li>
* <li>In normal hours, the max throughput is tune between * <li>In normal hours, the max throughput is tuned between
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula &quot;lower + * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula &quot;lower +
* (higer - lower) * compactionPressure&quot;, where compactionPressure is in range [0.0, 1.0]</li> * (higer - lower) * compactionPressure&quot;, where compactionPressure is in range [0.0, 1.0]</li>
@ -45,8 +40,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure() * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class PressureAwareCompactionThroughputController extends Configured implements public class PressureAwareCompactionThroughputController extends PressureAwareThroughputController {
CompactionThroughputController, Stoppable {
private final static Log LOG = LogFactory private final static Log LOG = LogFactory
.getLog(PressureAwareCompactionThroughputController.class); .getLog(PressureAwareCompactionThroughputController.class);
@ -73,51 +67,12 @@ public class PressureAwareCompactionThroughputController extends Configured impl
private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000; private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000;
/** // check compaction throughput every this size
* Stores the information of one controlled compaction. private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL =
*/ "hbase.hstore.compaction.throughput.control.check.interval";
private static final class ActiveCompaction {
private final long startTime;
private long lastControlTime;
private long lastControlSize;
private long totalSize;
private long numberOfSleeps;
private long totalSleepTime;
// prevent too many debug log
private long lastLogTime;
ActiveCompaction() {
long currentTime = EnvironmentEdgeManager.currentTime();
this.startTime = currentTime;
this.lastControlTime = currentTime;
this.lastLogTime = currentTime;
}
}
private long maxThroughputHigherBound;
private long maxThroughputLowerBound;
private long maxThroughputOffpeak; private long maxThroughputOffpeak;
private OffPeakHours offPeakHours;
private long controlPerSize;
private int tuningPeriod;
volatile double maxThroughput;
private final ConcurrentMap<String, ActiveCompaction> activeCompactions =
new ConcurrentHashMap<String, ActiveCompaction>();
@Override @Override
public void setup(final RegionServerServices server) { public void setup(final RegionServerServices server) {
server.getChoreService().scheduleChore( server.getChoreService().scheduleChore(
@ -141,14 +96,14 @@ public class PressureAwareCompactionThroughputController extends Configured impl
// compactionPressure is between 0.0 and 1.0, we use a simple linear formula to // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
// calculate the throughput limitation. // calculate the throughput limitation.
maxThroughputToSet = maxThroughputToSet =
maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound) maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
* compactionPressure; * compactionPressure;
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to " LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
+ throughputDesc(maxThroughputToSet)); + throughputDesc(maxThroughputToSet));
} }
this.maxThroughput = maxThroughputToSet; this.setMaxThroughput(maxThroughputToSet);
} }
@Override @Override
@ -157,7 +112,7 @@ public class PressureAwareCompactionThroughputController extends Configured impl
if (conf == null) { if (conf == null) {
return; return;
} }
this.maxThroughputHigherBound = this.maxThroughputUpperBound =
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND); DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND);
this.maxThroughputLowerBound = this.maxThroughputLowerBound =
@ -167,97 +122,32 @@ public class PressureAwareCompactionThroughputController extends Configured impl
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK, conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK); DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
this.offPeakHours = OffPeakHours.getInstance(conf); this.offPeakHours = OffPeakHours.getInstance(conf);
this.controlPerSize = this.maxThroughputLowerBound; this.controlPerSize =
this.maxThroughput = this.maxThroughputLowerBound; conf.getLong(HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL,
this.maxThroughputLowerBound);
this.setMaxThroughput(this.maxThroughputLowerBound);
this.tuningPeriod = this.tuningPeriod =
getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD); DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD);
LOG.info("Compaction throughput configurations, higher bound: " LOG.info("Compaction throughput configurations, higher bound: "
+ throughputDesc(maxThroughputHigherBound) + ", lower bound " + throughputDesc(maxThroughputUpperBound) + ", lower bound "
+ throughputDesc(maxThroughputLowerBound) + ", off peak: " + throughputDesc(maxThroughputLowerBound) + ", off peak: "
+ throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms"); + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
} }
private String throughputDesc(long deltaSize, long elapsedTime) {
return throughputDesc((double) deltaSize / elapsedTime * 1000);
}
private String throughputDesc(double speed) {
if (speed >= 1E15) { // large enough to say it is unlimited
return "unlimited";
} else {
return String.format("%.2f MB/sec", speed / 1024 / 1024);
}
}
@Override
public void start(String compactionName) {
activeCompactions.put(compactionName, new ActiveCompaction());
}
@Override
public long control(String compactionName, long size) throws InterruptedException {
ActiveCompaction compaction = activeCompactions.get(compactionName);
compaction.totalSize += size;
long deltaSize = compaction.totalSize - compaction.lastControlSize;
if (deltaSize < controlPerSize) {
return 0;
}
long now = EnvironmentEdgeManager.currentTime();
double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size();
long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
long elapsedTime = now - compaction.lastControlTime;
compaction.lastControlSize = compaction.totalSize;
if (elapsedTime >= minTimeAllowed) {
compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
return 0;
}
// too fast
long sleepTime = minTimeAllowed - elapsedTime;
if (LOG.isDebugEnabled()) {
// do not log too much
if (now - compaction.lastLogTime > 60L * 1000) {
LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is "
+ throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
+ throughputDesc(maxThroughputPerCompaction) + ", already slept "
+ compaction.numberOfSleeps + " time(s) and total slept time is "
+ compaction.totalSleepTime + " ms till now.");
compaction.lastLogTime = now;
}
}
Thread.sleep(sleepTime);
compaction.numberOfSleeps++;
compaction.totalSleepTime += sleepTime;
compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
return sleepTime;
}
@Override
public void finish(String compactionName) {
ActiveCompaction compaction = activeCompactions.remove(compactionName);
long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime);
LOG.info(compactionName + " average throughput is "
+ throughputDesc(compaction.totalSize, elapsedTime) + ", slept "
+ compaction.numberOfSleeps + " time(s) and total slept time is "
+ compaction.totalSleepTime + " ms. " + activeCompactions.size()
+ " active compactions remaining, total limit is " + throughputDesc(maxThroughput));
}
private volatile boolean stopped = false;
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
@Override @Override
public String toString() { public String toString() {
return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput) return "DefaultCompactionThroughputController [maxThroughput="
+ ", activeCompactions=" + activeCompactions.size() + "]"; + throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size()
+ "]";
}
@Override
protected boolean skipControl(long deltaSize, long controlSize) {
if (deltaSize < controlSize) {
return true;
} else {
return false;
}
} }
} }

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
/**
* A throughput controller which uses the follow schema to limit throughput
* <ul>
* <li>If flush pressure is greater than or equal to 1.0, no limitation.</li>
* <li>In normal case, the max throughput is tuned between
* {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND} and
* {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND}, using the formula &quot;lower +
* (upper - lower) * flushPressure&quot;, where flushPressure is in range [0.0, 1.0)</li>
* </ul>
* @see org.apache.hadoop.hbase.regionserver.HRegionServer#getFlushPressure()
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class PressureAwareFlushThroughputController extends PressureAwareThroughputController {
private static final Log LOG = LogFactory.getLog(PressureAwareFlushThroughputController.class);
public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND =
"hbase.hstore.flush.throughput.upper.bound";
private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND =
200L * 1024 * 1024;
public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND =
"hbase.hstore.flush.throughput.lower.bound";
private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND =
100L * 1024 * 1024;
public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD =
"hbase.hstore.flush.throughput.tune.period";
private static final int DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD = 20 * 1000;
// check flush throughput every this size
public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL =
"hbase.hstore.flush.throughput.control.check.interval";
private static final long DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL =
10L * 1024 * 1024;// 10MB
@Override
public void setup(final RegionServerServices server) {
server.getChoreService().scheduleChore(
new ScheduledChore("FlushThroughputTuner", this, tuningPeriod, this.tuningPeriod) {
@Override
protected void chore() {
tune(server.getFlushPressure());
}
});
}
private void tune(double flushPressure) {
double maxThroughputToSet;
if (flushPressure >= 1.0) {
// set to unlimited if global memstore size already exceeds lower limit
maxThroughputToSet = Double.MAX_VALUE;
} else {
// flushPressure is between 0.0 and 1.0, we use a simple linear formula to
// calculate the throughput limitation.
maxThroughputToSet =
maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
* flushPressure;
}
if (LOG.isDebugEnabled()) {
LOG.debug("flushPressure is " + flushPressure + ", tune flush throughput to "
+ throughputDesc(maxThroughputToSet));
}
this.setMaxThroughput(maxThroughputToSet);
}
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf == null) {
return;
}
this.maxThroughputUpperBound =
conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND,
DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND);
this.maxThroughputLowerBound =
conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND,
DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND);
this.offPeakHours = OffPeakHours.getInstance(conf);
this.controlPerSize =
conf.getLong(HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL,
DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL);
this.setMaxThroughput(this.maxThroughputLowerBound);
this.tuningPeriod =
getConf().getInt(HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD,
DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD);
LOG.info("Flush throughput configurations, upper bound: "
+ throughputDesc(maxThroughputUpperBound) + ", lower bound "
+ throughputDesc(maxThroughputLowerBound) + ", tuning period: " + tuningPeriod + " ms");
}
@Override
public String toString() {
return "DefaultFlushController [maxThroughput=" + throughputDesc(getMaxThroughput())
+ ", activeFlushNumber=" + activeOperations.size() + "]";
}
@Override
protected boolean skipControl(long deltaSize, long controlSize) {
// for flush, we control the flow no matter whether the flush size is small
return false;
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public abstract class PressureAwareThroughputController extends Configured implements
ThroughputController, Stoppable {
private static final Log LOG = LogFactory.getLog(PressureAwareThroughputController.class);
/**
* Stores the information of one controlled compaction.
*/
private static final class ActiveOperation {
private final long startTime;
private long lastControlTime;
private long lastControlSize;
private long totalSize;
private long numberOfSleeps;
private long totalSleepTime;
// prevent too many debug log
private long lastLogTime;
ActiveOperation() {
long currentTime = EnvironmentEdgeManager.currentTime();
this.startTime = currentTime;
this.lastControlTime = currentTime;
this.lastLogTime = currentTime;
}
}
protected long maxThroughputUpperBound;
protected long maxThroughputLowerBound;
protected OffPeakHours offPeakHours;
protected long controlPerSize;
protected int tuningPeriod;
private volatile double maxThroughput;
protected final ConcurrentMap<String, ActiveOperation> activeOperations =
new ConcurrentHashMap<String, ActiveOperation>();
@Override
public abstract void setup(final RegionServerServices server);
protected String throughputDesc(long deltaSize, long elapsedTime) {
return throughputDesc((double) deltaSize / elapsedTime * 1000);
}
protected String throughputDesc(double speed) {
if (speed >= 1E15) { // large enough to say it is unlimited
return "unlimited";
} else {
return String.format("%.2f MB/sec", speed / 1024 / 1024);
}
}
@Override
public void start(String opName) {
activeOperations.put(opName, new ActiveOperation());
}
@Override
public long control(String opName, long size) throws InterruptedException {
ActiveOperation operation = activeOperations.get(opName);
operation.totalSize += size;
long deltaSize = operation.totalSize - operation.lastControlSize;
if (deltaSize < controlPerSize) {
return 0;
}
long now = EnvironmentEdgeManager.currentTime();
double maxThroughputPerCompaction = this.getMaxThroughput() / activeOperations.size();
long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
long elapsedTime = now - operation.lastControlTime;
operation.lastControlSize = operation.totalSize;
if (elapsedTime >= minTimeAllowed) {
operation.lastControlTime = EnvironmentEdgeManager.currentTime();
return 0;
}
// too fast
long sleepTime = minTimeAllowed - elapsedTime;
if (LOG.isDebugEnabled()) {
// do not log too much
if (now - operation.lastLogTime > 5L * 1000) {
LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns");
LOG.debug(opName + " sleep " + sleepTime + " ms because current throughput is "
+ throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
+ throughputDesc(maxThroughputPerCompaction) + ", already slept "
+ operation.numberOfSleeps + " time(s) and total slept time is "
+ operation.totalSleepTime + " ms till now.");
operation.lastLogTime = now;
}
}
Thread.sleep(sleepTime);
operation.numberOfSleeps++;
operation.totalSleepTime += sleepTime;
operation.lastControlTime = EnvironmentEdgeManager.currentTime();
return sleepTime;
}
/**
* Check whether to skip control given delta size and control size
* @param deltaSize Delta size since last control
* @param controlSize Size limit to perform control
* @return a boolean indicates whether to skip this control
*/
protected abstract boolean skipControl(long deltaSize, long controlSize);
@Override
public void finish(String opName) {
ActiveOperation operation = activeOperations.remove(opName);
long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime;
LOG.info(opName + " average throughput is "
+ throughputDesc(operation.totalSize, elapsedTime) + ", slept "
+ operation.numberOfSleeps + " time(s) and total slept time is "
+ operation.totalSleepTime + " ms. " + activeOperations.size()
+ " active operations remaining, total limit is " + throughputDesc(getMaxThroughput()));
}
private volatile boolean stopped = false;
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
public double getMaxThroughput() {
return maxThroughput;
}
public void setMaxThroughput(double maxThroughput) {
this.maxThroughput = maxThroughput;
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.Store;
/**
* Helper methods for throttling
*/
@InterfaceAudience.Private
public final class ThroughputControlUtil {
private ThroughputControlUtil() {
}
private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
private static final String NAME_DELIMITER = "#";
/**
* Generate a name for throttling, to prevent name conflict when multiple IO operation running
* parallel on the same store.
* @param store the Store instance on which IO operation is happening
* @param opName Name of the IO operation, e.g. "flush", "compaction", etc.
* @return The name for throttling
*/
public static String getNameForThrottling(final Store store, final String opName) {
int counter;
for (;;) {
counter = NAME_COUNTER.get();
int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
if (NAME_COUNTER.compareAndSet(counter, next)) {
break;
}
}
return store.getRegionInfo().getRegionNameAsString() + NAME_DELIMITER
+ store.getFamily().getNameAsString() + NAME_DELIMITER + opName + NAME_DELIMITER + counter;
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
@ -23,11 +23,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/** /**
* A utility that constrains the total throughput of one or more simultaneous flows (compactions) by * A utility that constrains the total throughput of one or more simultaneous flows by
* sleeping when necessary. * sleeping when necessary.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public interface CompactionThroughputController extends Stoppable { public interface ThroughputController extends Stoppable {
/** /**
* Setup controller for the given region server. * Setup controller for the given region server.
@ -35,18 +35,18 @@ public interface CompactionThroughputController extends Stoppable {
void setup(RegionServerServices server); void setup(RegionServerServices server);
/** /**
* Start a compaction. * Start the throughput controller.
*/ */
void start(String compactionName); void start(String name);
/** /**
* Control the compaction throughput. Will sleep if too fast. * Control the throughput. Will sleep if too fast.
* @return the actual sleep time. * @return the actual sleep time.
*/ */
long control(String compactionName, long size) throws InterruptedException; long control(String name, long size) throws InterruptedException;
/** /**
* Finish a compaction. Should call this method in a finally block. * Finish the controller. Should call this method in a finally block.
*/ */
void finish(String compactionName); void finish(String name);
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -317,4 +318,13 @@ public class MockRegionServerServices implements RegionServerServices {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
public ThroughputController getFlushThroughputController() {
return null;
}
@Override
public double getFlushPressure() {
return 0;
}
} }

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@ -44,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -122,7 +121,7 @@ public class TestIOFencing {
@Override @Override
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
try { try {
return super.compact(compaction, store, throughputController); return super.compact(compaction, store, throughputController);
} finally { } finally {
@ -132,7 +131,7 @@ public class TestIOFencing {
@Override @Override
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
try { try {
return super.compact(compaction, store, throughputController, user); return super.compact(compaction, store, throughputController, user);
} finally { } finally {

View File

@ -60,12 +60,12 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -235,7 +235,7 @@ public class TestRegionObserverScannerOpenHook {
@Override @Override
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException { ThroughputController throughputController) throws IOException {
boolean ret = super.compact(compaction, store, throughputController); boolean ret = super.compact(compaction, store, throughputController);
if (ret) compactionStateChangeLatch.countDown(); if (ret) compactionStateChangeLatch.countDown();
return ret; return ret;
@ -243,7 +243,7 @@ public class TestRegionObserverScannerOpenHook {
@Override @Override
public boolean compact(CompactionContext compaction, Store store, public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException { ThroughputController throughputController, User user) throws IOException {
boolean ret = super.compact(compaction, store, throughputController, user); boolean ret = super.compact(compaction, store, throughputController, user);
if (ret) compactionStateChangeLatch.countDown(); if (ret) compactionStateChangeLatch.countDown();
return ret; return ret;

View File

@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -662,4 +663,13 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
public ThroughputController getFlushThroughputController() {
return null;
}
@Override
public double getFlushPressure() {
return 0;
}
} }

View File

@ -22,8 +22,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert; import org.junit.Assert;

View File

@ -59,9 +59,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -104,7 +104,7 @@ public class TestCompaction {
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100); conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName()); NoLimitThroughputController.class.getName());
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3); compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
secondRowBytes = START_KEY_BYTES.clone(); secondRowBytes = START_KEY_BYTES.clone();
@ -363,13 +363,13 @@ public class TestCompaction {
} }
@Override @Override
public List<Path> compact(CompactionThroughputController throughputController) public List<Path> compact(ThroughputController throughputController)
throws IOException { throws IOException {
return compact(throughputController, null); return compact(throughputController, null);
} }
@Override @Override
public List<Path> compact(CompactionThroughputController throughputController, User user) public List<Path> compact(ThroughputController throughputController, User user)
throws IOException { throws IOException {
finishCompaction(this.selectedFiles); finishCompaction(this.selectedFiles);
return new ArrayList<Path>(); return new ArrayList<Path>();
@ -421,13 +421,13 @@ public class TestCompaction {
} }
@Override @Override
public List<Path> compact(CompactionThroughputController throughputController) public List<Path> compact(ThroughputController throughputController)
throws IOException { throws IOException {
return compact(throughputController, null); return compact(throughputController, null);
} }
@Override @Override
public List<Path> compact(CompactionThroughputController throughputController, User user) public List<Path> compact(ThroughputController throughputController, User user)
throws IOException { throws IOException {
try { try {
isInCompact = true; isInCompact = true;
@ -510,10 +510,10 @@ public class TestCompaction {
HRegion r = mock(HRegion.class); HRegion r = mock(HRegion.class);
when( when(
r.compact(any(CompactionContext.class), any(Store.class), r.compact(any(CompactionContext.class), any(Store.class),
any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() { any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
public Boolean answer(InvocationOnMock invocation) throws Throwable { public Boolean answer(InvocationOnMock invocation) throws Throwable {
invocation.getArgumentAt(0, CompactionContext.class).compact( invocation.getArgumentAt(0, CompactionContext.class).compact(
invocation.getArgumentAt(2, CompactionThroughputController.class)); invocation.getArgumentAt(2, ThroughputController.class));
return true; return true;
} }
}); });

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -537,7 +537,7 @@ public class TestHMobStore {
// Trigger major compaction // Trigger major compaction
this.store.triggerMajorCompaction(); this.store.triggerMajorCompaction();
CompactionContext requestCompaction = this.store.requestCompaction(1, null); CompactionContext requestCompaction = this.store.requestCompaction(1, null);
this.store.compact(requestCompaction, NoLimitCompactionThroughputController.INSTANCE); this.store.compact(requestCompaction, NoLimitThroughputController.INSTANCE);
Assert.assertEquals(1, this.store.getStorefiles().size()); Assert.assertEquals(1, this.store.getStorefiles().size());
//Check encryption after compaction //Check encryption after compaction

View File

@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -314,7 +314,7 @@ public class TestHRegionReplayEvents {
// compaction from primary // compaction from primary
LOG.info("-- Compacting primary, only 1 store"); LOG.info("-- Compacting primary, only 1 store");
primaryRegion.compactStore(Bytes.toBytes("cf1"), primaryRegion.compactStore(Bytes.toBytes("cf1"),
NoLimitCompactionThroughputController.INSTANCE); NoLimitThroughputController.INSTANCE);
// now replay the edits and the flush marker // now replay the edits and the flush marker
reader = createWALReaderForPrimary(); reader = createWALReaderForPrimary();

View File

@ -85,7 +85,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -259,7 +259,7 @@ public class TestSplitTransactionOnCluster {
region.initialize(); region.initialize();
// 2, Run Compaction cc // 2, Run Compaction cc
assertFalse(region.compact(cc, store, NoLimitCompactionThroughputController.INSTANCE)); assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE));
assertTrue(fileNum > store.getStorefiles().size()); assertTrue(fileNum > store.getStorefiles().size());
// 3, Split // 3, Split

View File

@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -382,7 +382,7 @@ public class TestStore {
Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
// after compact; check the lowest time stamp // after compact; check the lowest time stamp
store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE); store.compact(store.requestCompaction(), NoLimitThroughputController.INSTANCE);
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);

View File

@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -133,7 +133,7 @@ public class TestStripeCompactor {
StripeCompactor sc = createCompactor(writers, input); StripeCompactor sc = createCompactor(writers, input);
List<Path> paths = List<Path> paths =
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
NoLimitCompactionThroughputController.INSTANCE); NoLimitThroughputController.INSTANCE);
writers.verifyKvs(output, allFiles, true); writers.verifyKvs(output, allFiles, true);
if (allFiles) { if (allFiles) {
assertEquals(output.length, paths.size()); assertEquals(output.length, paths.size());
@ -170,7 +170,7 @@ public class TestStripeCompactor {
StripeCompactor sc = createCompactor(writers, input); StripeCompactor sc = createCompactor(writers, input);
List<Path> paths = List<Path> paths =
sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
NoLimitCompactionThroughputController.INSTANCE); NoLimitThroughputController.INSTANCE);
assertEquals(output.length, paths.size()); assertEquals(output.length, paths.size());
writers.verifyKvs(output, true, true); writers.verifyKvs(output, true, true);
List<byte[]> boundaries = new ArrayList<byte[]>(); List<byte[]> boundaries = new ArrayList<byte[]>();

View File

@ -37,10 +37,10 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -77,7 +77,7 @@ public class TestStripeStoreEngine {
when( when(
mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class),
any(CompactionThroughputController.class), any(User.class))) any(ThroughputController.class), any(User.class)))
.thenReturn(new ArrayList<Path>()); .thenReturn(new ArrayList<Path>());
// Produce 3 L0 files. // Produce 3 L0 files.
@ -96,10 +96,10 @@ public class TestStripeStoreEngine {
assertEquals(2, compaction.getRequest().getFiles().size()); assertEquals(2, compaction.getRequest().getFiles().size());
assertFalse(compaction.getRequest().getFiles().contains(sf)); assertFalse(compaction.getRequest().getFiles().contains(sf));
// Make sure the correct method it called on compactor. // Make sure the correct method it called on compactor.
compaction.compact(NoLimitCompactionThroughputController.INSTANCE); compaction.compact(NoLimitThroughputController.INSTANCE);
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
NoLimitCompactionThroughputController.INSTANCE, null); NoLimitThroughputController.INSTANCE, null);
} }
private static StoreFile createFile() throws Exception { private static StoreFile createFile() throws Exception {

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture; import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -216,10 +217,10 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al())); assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
any(NoLimitCompactionThroughputController.class), any(User.class)); any(NoLimitThroughputController.class), any(User.class));
} }
@Test @Test
@ -469,7 +470,7 @@ public class TestStripeCompactionPolicy {
// All the Stripes are expired, so the Compactor will not create any Writers. We need to create // All the Stripes are expired, so the Compactor will not create any Writers. We need to create
// an empty file to preserve metadata // an empty file to preserve metadata
StripeCompactor sc = createCompactor(); StripeCompactor sc = createCompactor();
List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
assertEquals(1, paths.size()); assertEquals(1, paths.size());
} }
@ -528,7 +529,7 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al())); assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() { verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
@Override @Override
public boolean matches(Object argument) { public boolean matches(Object argument) {
@ -542,7 +543,7 @@ public class TestStripeCompactionPolicy {
} }
}), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
any(NoLimitCompactionThroughputController.class), any(User.class)); any(NoLimitThroughputController.class), any(User.class));
} }
/** /**
@ -563,12 +564,12 @@ public class TestStripeCompactionPolicy {
assertTrue(!needsCompaction || policy.needsCompactions(si, al())); assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()), verify(sc, times(1)).compact(eq(scr.getRequest()),
count == null ? anyInt() : eq(count.intValue()), count == null ? anyInt() : eq(count.intValue()),
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
any(NoLimitCompactionThroughputController.class), any(User.class)); any(NoLimitThroughputController.class), any(User.class));
} }
/** Verify arbitrary flush. */ /** Verify arbitrary flush. */

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.throttle;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -45,6 +45,10 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreEngine; import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -143,7 +147,7 @@ public class TestCompactionWithThroughputController {
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName()); NoLimitThroughputController.class.getName());
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
try { try {
Store store = prepareData(); Store store = prepareData();
@ -204,7 +208,7 @@ public class TestCompactionWithThroughputController {
PressureAwareCompactionThroughputController throughputController = PressureAwareCompactionThroughputController throughputController =
(PressureAwareCompactionThroughputController) regionServer.compactSplitThread (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
.getCompactionThroughputController(); .getCompactionThroughputController();
assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON); assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
Table table = conn.getTable(tableName); Table table = conn.getTable(tableName);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
byte[] value = new byte[0]; byte[] value = new byte[0];
@ -212,26 +216,26 @@ public class TestCompactionWithThroughputController {
TEST_UTIL.flush(tableName); TEST_UTIL.flush(tableName);
} }
Thread.sleep(2000); Thread.sleep(2000);
assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON); assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
byte[] value1 = new byte[0]; byte[] value1 = new byte[0];
table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1)); table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
TEST_UTIL.flush(tableName); TEST_UTIL.flush(tableName);
Thread.sleep(2000); Thread.sleep(2000);
assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON); assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
byte[] value = new byte[0]; byte[] value = new byte[0];
table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value)); table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
TEST_UTIL.flush(tableName); TEST_UTIL.flush(tableName);
Thread.sleep(2000); Thread.sleep(2000);
assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON); assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName()); NoLimitThroughputController.class.getName());
regionServer.compactSplitThread.onConfigurationChange(conf); regionServer.compactSplitThread.onConfigurationChange(conf);
assertTrue(throughputController.isStopped()); assertTrue(throughputController.isStopped());
assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
instanceof NoLimitCompactionThroughputController); instanceof NoLimitThroughputController);
} finally { } finally {
conn.close(); conn.close();
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestFlushWithThroughputController {
private static final Log LOG = LogFactory.getLog(TestFlushWithThroughputController.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final double EPSILON = 1E-6;
private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
private final byte[] family = Bytes.toBytes("f");
private final byte[] qualifier = Bytes.toBytes("q");
private Store getStoreWithName(TableName tableName) {
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
HRegionServer hrs = rsts.get(i).getRegionServer();
for (Region region : hrs.getOnlineRegions(tableName)) {
return region.getStores().iterator().next();
}
}
return null;
}
private Store generateAndFlushData() throws IOException {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTable table = TEST_UTIL.createTable(tableName, family);
Random rand = new Random();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[256 * 1024];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
admin.flush(tableName);
}
return getStoreWithName(tableName);
}
private long testFlushWithThroughputLimit() throws Exception {
long throughputLimit = 1L * 1024 * 1024;
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
PressureAwareFlushThroughputController.class.getName());
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND,
throughputLimit);
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND,
throughputLimit);
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL,
throughputLimit);
TEST_UTIL.startMiniCluster(1);
try {
long startTime = System.nanoTime();
Store store = generateAndFlushData();
assertEquals(10, store.getStorefilesCount());
long duration = System.nanoTime() - startTime;
double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000;
LOG.debug("Throughput is: " + (throughput / 1024 / 1024) + " MB/s");
// confirm that the speed limit work properly(not too fast, and also not too slow)
// 20% is the max acceptable error rate.
assertTrue(throughput < throughputLimit * 1.2);
assertTrue(throughput > throughputLimit * 0.8);
return duration;
} finally {
TEST_UTIL.shutdownMiniCluster();
}
}
private long testFlushWithoutThroughputLimit() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
TEST_UTIL.startMiniCluster(1);
try {
long startTime = System.nanoTime();
Store store = generateAndFlushData();
assertEquals(10, store.getStorefilesCount());
long duration = System.nanoTime() - startTime;
double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000;
LOG.debug("Throughput w/o limit is: " + (throughput / 1024 / 1024) + " MB/s");
return duration;
} finally {
TEST_UTIL.shutdownMiniCluster();
}
}
@Test
public void testFlushControl() throws Exception {
long limitTime = testFlushWithThroughputLimit();
long noLimitTime = testFlushWithoutThroughputLimit();
LOG.info("With 1M/s limit, flush use " + (limitTime / 1000000)
+ "ms; without limit, flush use " + (noLimitTime / 1000000) + "ms");
// Commonly if multiple region flush at the same time, the throughput could be very high
// but flush in this test is in serial, so we use a weak assumption.
assertTrue(limitTime > 2 * noLimitTime);
}
/**
* Test the tuning task of {@link PressureAwareFlushThroughputController}
*/
@Test
public void testFlushThroughputTuning() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND,
20L * 1024 * 1024);
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND,
10L * 1024 * 1024);
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
PressureAwareFlushThroughputController.class.getName());
conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD,
3000);
TEST_UTIL.startMiniCluster(1);
assertEquals(10L * 1024 * 1024,
((PressureAwareThroughputController) TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.getFlushThroughputController()).getMaxThroughput(), EPSILON);
Connection conn = ConnectionFactory.createConnection(conf);
try {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(family));
htd.setCompactionEnabled(false);
TEST_UTIL.getHBaseAdmin().createTable(htd);
TEST_UTIL.waitTableAvailable(tableName);
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
PressureAwareFlushThroughputController throughputController =
(PressureAwareFlushThroughputController) regionServer.getFlushThroughputController();
Table table = conn.getTable(tableName);
Random rand = new Random();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[256 * 1024];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
}
Thread.sleep(5000);
double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure());
assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON);
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
regionServer.onConfigurationChange(conf);
assertTrue(throughputController.isStopped());
assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController);
} finally {
conn.close();
TEST_UTIL.shutdownMiniCluster();
}
}
/**
* Test the logic for striped store.
*/
@Test
public void testFlushControlForStripedStore() throws Exception {
TEST_UTIL.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY,
StripeStoreEngine.class.getName());
testFlushControl();
}
}

View File

@ -64,7 +64,19 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -644,11 +656,11 @@ public class TestWALReplay {
} }
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status) throws IOException { MonitoredTask status, ThroughputController throughputController) throws IOException {
if (throwExceptionWhenFlushing.get()) { if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests"); throw new IOException("Simulated exception by tests");
} }
return super.flushSnapshot(snapshot, cacheFlushId, status); return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController);
} }
}; };