HBASE-14969 Add throughput controller for flush

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
绝顶 2016-01-29 09:38:13 +08:00 committed by zhangduo
parent 2f571b1457
commit 0d21fa9279
41 changed files with 1079 additions and 332 deletions

View File

@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@ -89,7 +89,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
private final ThreadPoolExecutor splits;
private final ThreadPoolExecutor mergePool;
private volatile CompactionThroughputController compactionThroughputController;
private volatile ThroughputController compactionThroughputController;
/**
* Splitting should not take place if the total number of regions exceed this.
@ -671,7 +671,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
}
}
CompactionThroughputController old = this.compactionThroughputController;
ThroughputController old = this.compactionThroughputController;
if (old != null) {
old.stop("configuration change");
}
@ -716,7 +716,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
}
@VisibleForTesting
public CompactionThroughputController getCompactionThroughputController() {
public ThroughputController getCompactionThroughputController() {
return compactionThroughputController;
}

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.mapreduce.JobUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@ -162,7 +162,7 @@ public class CompactionTool extends Configured implements Tool {
CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
if (compaction == null) break;
List<StoreFile> storeFiles =
store.compact(compaction, NoLimitCompactionThroughputController.INSTANCE);
store.compact(compaction, NoLimitThroughputController.INSTANCE);
if (storeFiles != null && !storeFiles.isEmpty()) {
if (keepCompactedFiles && deleteCompacted) {
for (StoreFile storeFile: storeFiles) {

View File

@ -21,16 +21,16 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -108,13 +108,13 @@ public class DefaultStoreEngine extends StoreEngine<
}
@Override
public List<Path> compact(CompactionThroughputController throughputController)
public List<Path> compact(ThroughputController throughputController)
throws IOException {
return compact(throughputController, null);
}
@Override
public List<Path> compact(CompactionThroughputController throughputController, User user)
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
return compactor.compact(request, throughputController, user);
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.util.StringUtils;
/**
@ -44,7 +45,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status) throws IOException {
MonitoredTask status, ThroughputController throughputController) throws IOException {
ArrayList<Path> result = new ArrayList<Path>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@ -71,7 +72,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
IOException e = null;
try {
performFlush(scanner, writer, smallestReadPoint);
performFlush(scanner, writer, smallestReadPoint, throughputController);
} catch (IOException ioe) {
e = ioe;
// throw the exception out

View File

@ -147,9 +147,9 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
@ -1740,12 +1740,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Store s : getStores()) {
CompactionContext compaction = s.requestCompaction();
if (compaction != null) {
CompactionThroughputController controller = null;
ThroughputController controller = null;
if (rsServices != null) {
controller = CompactionThroughputControllerFactory.create(rsServices, conf);
}
if (controller == null) {
controller = NoLimitCompactionThroughputController.INSTANCE;
controller = NoLimitThroughputController.INSTANCE;
}
compact(compaction, s, controller, null);
}
@ -1762,7 +1762,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Store s : getStores()) {
CompactionContext compaction = s.requestCompaction();
if (compaction != null) {
compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
compact(compaction, s, NoLimitThroughputController.INSTANCE, null);
}
}
}
@ -1774,7 +1774,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException e
*/
@VisibleForTesting
void compactStore(byte[] family, CompactionThroughputController throughputController)
void compactStore(byte[] family, ThroughputController throughputController)
throws IOException {
Store s = getStore(family);
CompactionContext compaction = s.requestCompaction();
@ -1799,12 +1799,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return whether the compaction completed
*/
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException {
ThroughputController throughputController) throws IOException {
return compact(compaction, store, throughputController, null);
}
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
assert compaction != null && compaction.hasSelection();
assert !compaction.getRequest().getFiles().isEmpty();
if (this.closing.get() || this.closed.get()) {

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
@ -138,6 +139,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@ -189,6 +192,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import sun.misc.Signal;
import sun.misc.SignalHandler;
@ -199,7 +203,7 @@ import sun.misc.SignalHandler;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings("deprecation")
public class HRegionServer extends HasThread implements
RegionServerServices, LastSequenceId {
RegionServerServices, LastSequenceId, ConfigurationObserver {
private static final Log LOG = LogFactory.getLog(HRegionServer.class);
@ -482,6 +486,8 @@ public class HRegionServer extends HasThread implements
private CompactedHFilesDischarger compactedFileDischarger;
private volatile ThroughputController flushThroughputController;
/**
* Starts a HRegionServer at the default location.
* @param conf
@ -605,6 +611,7 @@ public class HRegionServer extends HasThread implements
putUpWebUI();
this.walRoller = new LogRoller(this, this);
this.choreService = new ChoreService(getServerName().toString(), true);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
if (!SystemUtils.IS_OS_WINDOWS) {
Signal.handle(new Signal("HUP"), new SignalHandler() {
@ -890,6 +897,7 @@ public class HRegionServer extends HasThread implements
// Registering the compactSplitThread object with the ConfigurationManager.
configurationManager.registerObserver(this.compactSplitThread);
configurationManager.registerObserver(this.rpcServices);
configurationManager.registerObserver(this);
}
/**
@ -3360,4 +3368,28 @@ public class HRegionServer extends HasThread implements
public boolean walRollRequestFinished() {
return this.walRoller.walRollFinished();
}
@Override
public ThroughputController getFlushThroughputController() {
return flushThroughputController;
}
@Override
public double getFlushPressure() {
if (getRegionServerAccounting() == null || cacheFlusher == null) {
// return 0 during RS initialization
return 0.0;
}
return getRegionServerAccounting().getGlobalMemstoreSize() * 1.0
/ cacheFlusher.globalMemStoreLimitLowMark;
}
@Override
public void onConfigurationChange(Configuration newConf) {
ThroughputController old = this.flushThroughputController;
if (old != null) {
old.stop("configuration change");
}
this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
}
}

View File

@ -81,7 +81,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
@ -883,7 +883,7 @@ public class HStore implements Store {
/**
* Snapshot this stores memstore. Call before running
* {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)}
* {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)}
* so it has some work to do.
*/
void snapshot() {
@ -896,15 +896,16 @@ public class HStore implements Store {
}
/**
* Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
* Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
* @param logCacheFlushId flush sequence number
* @param snapshot
* @param status
* @param throughputController
* @return The path name of the tmp file to which the store was flushed
* @throws IOException
* @throws IOException if exception occurs during process
*/
protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
MonitoredTask status) throws IOException {
MonitoredTask status, ThroughputController throughputController) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
@ -914,7 +915,8 @@ public class HStore implements Store {
IOException lastException = null;
for (int i = 0; i < flushRetriesNumber; i++) {
try {
List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
List<Path> pathNames =
flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController);
Path lastPathName = null;
try {
for (Path pathName : pathNames) {
@ -1213,13 +1215,13 @@ public class HStore implements Store {
*/
@Override
public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException {
ThroughputController throughputController) throws IOException {
return compact(compaction, throughputController, null);
}
@Override
public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
assert compaction != null;
List<StoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest();
@ -2267,7 +2269,10 @@ public class HStore implements Store {
@Override
public void flushCache(MonitoredTask status) throws IOException {
tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
RegionServerServices rsService = region.getRegionServerServices();
ThroughputController throughputController =
rsService == null ? null : rsService.getFlushThroughputController();
tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController);
}
@Override

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.Service;
@ -230,4 +231,16 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
* @return all the online tables in this RS
*/
Set<TableName> getOnlineTables();
/**
* @return the controller to avoid flush too fast
*/
ThroughputController getFlushThroughputController();
/**
* @return the flush pressure of all stores on this regionserver. The value should be greater than
* or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that
* global memstore size already exceeds lower limit.
*/
double getFlushPressure();
}

View File

@ -22,8 +22,6 @@ import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -32,6 +30,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.io.HeapSize;
@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
/**
@ -239,14 +239,14 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
void cancelRequestedCompaction(CompactionContext compaction);
/**
* @deprecated see compact(CompactionContext, CompactionThroughputController, User)
* @deprecated see compact(CompactionContext, ThroughputController, User)
*/
@Deprecated
List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController) throws IOException;
ThroughputController throughputController) throws IOException;
List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController, User user) throws IOException;
ThroughputController throughputController, User user) throws IOException;
/**
* @return true if we should run a major compaction.

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -27,10 +28,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
/**
* Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@ -51,10 +55,11 @@ abstract class StoreFlusher {
* @param snapshot Memstore snapshot.
* @param cacheFlushSeqNum Log cache flush sequence number.
* @param status Task that represents the flush operation and may be updated with status.
* @param throughputController A controller to avoid flush too fast
* @return List of files written. Can be empty; must not be null.
*/
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status) throws IOException;
MonitoredTask status, ThroughputController throughputController) throws IOException;
protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
@ -104,9 +109,10 @@ abstract class StoreFlusher {
* @param scanner Scanner to get data from.
* @param sink Sink to write data to. Could be StoreFile.Writer.
* @param smallestReadPoint Smallest read point used for the flush.
* @param throughputController A controller to avoid flush too fast
*/
protected void performFlush(InternalScanner scanner,
Compactor.CellSink sink, long smallestReadPoint) throws IOException {
protected void performFlush(InternalScanner scanner, Compactor.CellSink sink,
long smallestReadPoint, ThroughputController throughputController) throws IOException {
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
@ -115,17 +121,36 @@ abstract class StoreFlusher {
List<Cell> kvs = new ArrayList<Cell>();
boolean hasMore;
do {
hasMore = scanner.next(kvs, scannerContext);
if (!kvs.isEmpty()) {
for (Cell c : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
sink.append(c);
String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
// no control on system table (such as meta, namespace, etc) flush
boolean control = throughputController != null && !store.getRegionInfo().isSystemTable();
if (control) {
throughputController.start(flushName);
}
try {
do {
hasMore = scanner.next(kvs, scannerContext);
if (!kvs.isEmpty()) {
for (Cell c : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
sink.append(c);
int len = KeyValueUtil.length(c);
if (control) {
throughputController.control(flushName, len);
}
}
kvs.clear();
}
kvs.clear();
} while (hasMore);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted while control throughput of flushing "
+ flushName);
} finally {
if (control) {
throughputController.finish(flushName);
}
} while (hasMore);
}
}
}

View File

@ -23,16 +23,16 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import com.google.common.base.Preconditions;
@ -100,14 +100,14 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
}
@Override
public List<Path> compact(CompactionThroughputController throughputController)
public List<Path> compact(ThroughputController throughputController)
throws IOException {
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
return this.stripeRequest.execute(compactor, throughputController, null);
}
@Override
public List<Path> compact(CompactionThroughputController throughputController, User user)
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
return this.stripeRequest.execute(compactor, throughputController, user);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import com.google.common.annotations.VisibleForTesting;
@ -56,7 +57,7 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
MonitoredTask status, ThroughputController throughputController) throws IOException {
List<Path> result = new ArrayList<Path>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@ -80,7 +81,7 @@ public class StripeStoreFlusher extends StoreFlusher {
mw.init(storeScanner, factory, store.getComparator());
synchronized (flushLock) {
performFlush(scanner, mw, smallestReadPoint);
performFlush(scanner, mw, smallestReadPoint, throughputController);
result = mw.commitWriters(cacheFlushSeqNum, false);
success = true;
}

View File

@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
@ -69,10 +70,10 @@ public abstract class CompactionContext {
* Runs the compaction based on current selection. select/forceSelect must have been called.
* @return The new file paths resulting from compaction.
*/
public abstract List<Path> compact(CompactionThroughputController throughputController)
public abstract List<Path> compact(ThroughputController throughputController)
throws IOException;
public abstract List<Path> compact(CompactionThroughputController throughputController, User user)
public abstract List<Path> compact(ThroughputController throughputController, User user)
throws IOException;
public CompactionRequest getRequest() {

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@ -33,23 +35,23 @@ public class CompactionThroughputControllerFactory {
public static final String HBASE_THROUGHPUT_CONTROLLER_KEY =
"hbase.regionserver.throughput.controller";
private static final Class<? extends CompactionThroughputController>
DEFAULT_THROUGHPUT_CONTROLLER_CLASS = NoLimitCompactionThroughputController.class;
private static final Class<? extends ThroughputController>
DEFAULT_THROUGHPUT_CONTROLLER_CLASS = NoLimitThroughputController.class;
public static CompactionThroughputController create(RegionServerServices server,
public static ThroughputController create(RegionServerServices server,
Configuration conf) {
Class<? extends CompactionThroughputController> clazz = getThroughputControllerClass(conf);
CompactionThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf);
ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
controller.setup(server);
return controller;
}
public static Class<? extends CompactionThroughputController> getThroughputControllerClass(
public static Class<? extends ThroughputController> getThroughputControllerClass(
Configuration conf) {
String className =
conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName());
try {
return Class.forName(className).asSubclass(CompactionThroughputController.class);
return Class.forName(className).asSubclass(ThroughputController.class);
} catch (Exception e) {
LOG.warn(
"Unable to load configured throughput controller '" + className

View File

@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -46,6 +45,8 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -267,24 +268,6 @@ public abstract class Compactor {
}
}
/**
* Used to prevent compaction name conflict when multiple compactions running parallel on the
* same store.
*/
private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
private String generateCompactionName() {
int counter;
for (;;) {
counter = NAME_COUNTER.get();
int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
if (NAME_COUNTER.compareAndSet(counter, next)) {
break;
}
}
return store.getRegionInfo().getRegionNameAsString() + "#"
+ store.getFamily().getNameAsString() + "#" + counter;
}
/**
* Performs the compaction.
* @param scanner Where to read from.
@ -295,7 +278,7 @@ public abstract class Compactor {
*/
protected boolean performCompaction(InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId,
CompactionThroughputController throughputController) throws IOException {
ThroughputController throughputController) throws IOException {
long bytesWritten = 0;
long bytesWrittenProgress = 0;
// Since scanner.next() can return 'false' but still be delivering data,
@ -306,7 +289,7 @@ public abstract class Compactor {
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
}
String compactionName = generateCompactionName();
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
ScannerContext scannerContext =

View File

@ -34,11 +34,13 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
/**
* Compact passed set of files. Create an instance and then call
* {@link #compact(CompactionRequest, CompactionThroughputController, User)}
* {@link #compact(CompactionRequest, ThroughputController, User)}
*/
@InterfaceAudience.Private
public class DefaultCompactor extends Compactor {
@ -52,7 +54,7 @@ public class DefaultCompactor extends Compactor {
* Do a minor/major compaction on an explicit set of storefiles from a Store.
*/
public List<Path> compact(final CompactionRequest request,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
@ -156,7 +158,7 @@ public class DefaultCompactor extends Compactor {
/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
* {@link #compact(CompactionRequest, CompactionThroughputController, User)};
* {@link #compact(CompactionRequest, ThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for
* the generated {@link CompactionRequest}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
@ -168,6 +170,6 @@ public class DefaultCompactor extends Compactor {
throws IOException {
CompactionRequest cr = new CompactionRequest(filesToCompact);
cr.setIsMajor(isMajor, isMajor);
return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null);
return this.compact(cr, NoLimitThroughputController.INSTANCE, null);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
@ -392,7 +393,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController) throws IOException {
ThroughputController throughputController) throws IOException {
return execute(compactor, throughputController, null);
}
/**
@ -402,7 +403,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @return result of compact(...)
*/
public abstract List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController, User user) throws IOException;
ThroughputController throughputController, User user) throws IOException;
public StripeCompactionRequest(CompactionRequest request) {
this.request = request;
@ -454,7 +455,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
@Override
public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
this.majorRangeToRow, throughputController, user);
}
@ -505,7 +506,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
@Override
public List<Path> execute(StripeCompactor compactor,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -53,14 +54,14 @@ public class StripeCompactor extends Compactor {
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
ThroughputController throughputController) throws IOException {
return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
throughputController, null);
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@ -77,14 +78,14 @@ public class StripeCompactor extends Compactor {
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController) throws IOException {
ThroughputController throughputController) throws IOException {
return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
majorRangeToRow, throughputController, null);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + targetSize
+ " target file size, no more than " + targetCount + " files, in ["
@ -98,7 +99,7 @@ public class StripeCompactor extends Compactor {
private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount);

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public final class CompactionThroughputControllerFactory {
private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class);
public static final String HBASE_THROUGHPUT_CONTROLLER_KEY =
"hbase.regionserver.throughput.controller";
private CompactionThroughputControllerFactory() {
}
private static final Class<? extends ThroughputController>
DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class;
// for backward compatibility and may not be supported in the future
private static final String DEPRECATED_NAME_OF_PRESSURE_AWARE_THROUGHPUT_CONTROLLER_CLASS =
"org.apache.hadoop.hbase.regionserver.compactions."
+ "PressureAwareCompactionThroughputController";
private static final String DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS =
"org.apache.hadoop.hbase.regionserver.compactions."
+ "NoLimitThroughputController.java";
public static ThroughputController create(RegionServerServices server,
Configuration conf) {
Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf);
ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
controller.setup(server);
return controller;
}
public static Class<? extends ThroughputController> getThroughputControllerClass(
Configuration conf) {
String className =
conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName());
className = resolveDeprecatedClassName(className);
try {
return Class.forName(className).asSubclass(ThroughputController.class);
} catch (Exception e) {
LOG.warn(
"Unable to load configured throughput controller '" + className
+ "', load default throughput controller "
+ DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e);
return DEFAULT_THROUGHPUT_CONTROLLER_CLASS;
}
}
/**
* Resolve deprecated class name to keep backward compatibiliy
* @param oldName old name of the class
* @return the new name if there is any
*/
private static String resolveDeprecatedClassName(String oldName) {
String className = oldName;
if (className.equals(DEPRECATED_NAME_OF_PRESSURE_AWARE_THROUGHPUT_CONTROLLER_CLASS)) {
className = PressureAwareCompactionThroughputController.class.getName();
} else if (className.equals(DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS)) {
className = NoLimitThroughputController.class.getName();
}
if (!className.equals(oldName)) {
LOG.warn(oldName + " is deprecated, please use " + className + " instead");
}
return className;
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.util.ReflectionUtils;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public final class FlushThroughputControllerFactory {
private static final Log LOG = LogFactory.getLog(FlushThroughputControllerFactory.class);
public static final String HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY =
"hbase.regionserver.flush.throughput.controller";
private static final Class<? extends ThroughputController>
DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS = NoLimitThroughputController.class;
private FlushThroughputControllerFactory() {
}
public static ThroughputController create(RegionServerServices server,
Configuration conf) {
Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf);
ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
controller.setup(server);
return controller;
}
public static Class<? extends ThroughputController> getThroughputControllerClass(
Configuration conf) {
String className =
conf.get(HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName());
try {
return Class.forName(className).asSubclass(ThroughputController.class);
} catch (Exception e) {
LOG.warn(
"Unable to load configured flush throughput controller '" + className
+ "', load default throughput controller "
+ DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e);
return DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS;
}
}
}

View File

@ -15,20 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* A dummy CompactionThroughputController that does nothing.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class NoLimitCompactionThroughputController implements CompactionThroughputController {
public class NoLimitThroughputController implements ThroughputController {
public static final NoLimitCompactionThroughputController INSTANCE =
new NoLimitCompactionThroughputController();
public static final NoLimitThroughputController INSTANCE = new NoLimitThroughputController();
@Override
public void setup(RegionServerServices server) {
@ -47,7 +43,7 @@ public class NoLimitCompactionThroughputController implements CompactionThroughp
public void finish(String compactionName) {
}
private volatile boolean stopped;
private boolean stopped;
@Override
public void stop(String why) {
@ -61,6 +57,6 @@ public class NoLimitCompactionThroughputController implements CompactionThroughp
@Override
public String toString() {
return "NoLimitCompactionThroughputController";
return "NoLimitThroughputController";
}
}

View File

@ -15,21 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
/**
* A throughput controller which uses the follow schema to limit throughput
@ -37,7 +32,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* <li>If compaction pressure is greater than 1.0, no limitation.</li>
* <li>In off peak hours, use a fixed throughput limitation
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li>
* <li>In normal hours, the max throughput is tune between
* <li>In normal hours, the max throughput is tuned between
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and
* {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula &quot;lower +
* (higer - lower) * compactionPressure&quot;, where compactionPressure is in range [0.0, 1.0]</li>
@ -45,8 +40,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class PressureAwareCompactionThroughputController extends Configured implements
CompactionThroughputController, Stoppable {
public class PressureAwareCompactionThroughputController extends PressureAwareThroughputController {
private final static Log LOG = LogFactory
.getLog(PressureAwareCompactionThroughputController.class);
@ -73,51 +67,12 @@ public class PressureAwareCompactionThroughputController extends Configured impl
private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000;
/**
* Stores the information of one controlled compaction.
*/
private static final class ActiveCompaction {
private final long startTime;
private long lastControlTime;
private long lastControlSize;
private long totalSize;
private long numberOfSleeps;
private long totalSleepTime;
// prevent too many debug log
private long lastLogTime;
ActiveCompaction() {
long currentTime = EnvironmentEdgeManager.currentTime();
this.startTime = currentTime;
this.lastControlTime = currentTime;
this.lastLogTime = currentTime;
}
}
private long maxThroughputHigherBound;
private long maxThroughputLowerBound;
// check compaction throughput every this size
private static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL =
"hbase.hstore.compaction.throughput.control.check.interval";
private long maxThroughputOffpeak;
private OffPeakHours offPeakHours;
private long controlPerSize;
private int tuningPeriod;
volatile double maxThroughput;
private final ConcurrentMap<String, ActiveCompaction> activeCompactions =
new ConcurrentHashMap<String, ActiveCompaction>();
@Override
public void setup(final RegionServerServices server) {
server.getChoreService().scheduleChore(
@ -141,14 +96,14 @@ public class PressureAwareCompactionThroughputController extends Configured impl
// compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
// calculate the throughput limitation.
maxThroughputToSet =
maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
* compactionPressure;
}
if (LOG.isDebugEnabled()) {
LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
+ throughputDesc(maxThroughputToSet));
}
this.maxThroughput = maxThroughputToSet;
this.setMaxThroughput(maxThroughputToSet);
}
@Override
@ -157,7 +112,7 @@ public class PressureAwareCompactionThroughputController extends Configured impl
if (conf == null) {
return;
}
this.maxThroughputHigherBound =
this.maxThroughputUpperBound =
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND);
this.maxThroughputLowerBound =
@ -167,97 +122,32 @@ public class PressureAwareCompactionThroughputController extends Configured impl
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
this.offPeakHours = OffPeakHours.getInstance(conf);
this.controlPerSize = this.maxThroughputLowerBound;
this.maxThroughput = this.maxThroughputLowerBound;
this.controlPerSize =
conf.getLong(HBASE_HSTORE_COMPACTION_THROUGHPUT_CONTROL_CHECK_INTERVAL,
this.maxThroughputLowerBound);
this.setMaxThroughput(this.maxThroughputLowerBound);
this.tuningPeriod =
getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD);
LOG.info("Compaction throughput configurations, higher bound: "
+ throughputDesc(maxThroughputHigherBound) + ", lower bound "
+ throughputDesc(maxThroughputUpperBound) + ", lower bound "
+ throughputDesc(maxThroughputLowerBound) + ", off peak: "
+ throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
}
private String throughputDesc(long deltaSize, long elapsedTime) {
return throughputDesc((double) deltaSize / elapsedTime * 1000);
}
private String throughputDesc(double speed) {
if (speed >= 1E15) { // large enough to say it is unlimited
return "unlimited";
} else {
return String.format("%.2f MB/sec", speed / 1024 / 1024);
}
}
@Override
public void start(String compactionName) {
activeCompactions.put(compactionName, new ActiveCompaction());
}
@Override
public long control(String compactionName, long size) throws InterruptedException {
ActiveCompaction compaction = activeCompactions.get(compactionName);
compaction.totalSize += size;
long deltaSize = compaction.totalSize - compaction.lastControlSize;
if (deltaSize < controlPerSize) {
return 0;
}
long now = EnvironmentEdgeManager.currentTime();
double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size();
long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
long elapsedTime = now - compaction.lastControlTime;
compaction.lastControlSize = compaction.totalSize;
if (elapsedTime >= minTimeAllowed) {
compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
return 0;
}
// too fast
long sleepTime = minTimeAllowed - elapsedTime;
if (LOG.isDebugEnabled()) {
// do not log too much
if (now - compaction.lastLogTime > 60L * 1000) {
LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is "
+ throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
+ throughputDesc(maxThroughputPerCompaction) + ", already slept "
+ compaction.numberOfSleeps + " time(s) and total slept time is "
+ compaction.totalSleepTime + " ms till now.");
compaction.lastLogTime = now;
}
}
Thread.sleep(sleepTime);
compaction.numberOfSleeps++;
compaction.totalSleepTime += sleepTime;
compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
return sleepTime;
}
@Override
public void finish(String compactionName) {
ActiveCompaction compaction = activeCompactions.remove(compactionName);
long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime);
LOG.info(compactionName + " average throughput is "
+ throughputDesc(compaction.totalSize, elapsedTime) + ", slept "
+ compaction.numberOfSleeps + " time(s) and total slept time is "
+ compaction.totalSleepTime + " ms. " + activeCompactions.size()
+ " active compactions remaining, total limit is " + throughputDesc(maxThroughput));
}
private volatile boolean stopped = false;
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
@Override
public String toString() {
return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput)
+ ", activeCompactions=" + activeCompactions.size() + "]";
return "DefaultCompactionThroughputController [maxThroughput="
+ throughputDesc(getMaxThroughput()) + ", activeCompactions=" + activeOperations.size()
+ "]";
}
@Override
protected boolean skipControl(long deltaSize, long controlSize) {
if (deltaSize < controlSize) {
return true;
} else {
return false;
}
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
/**
* A throughput controller which uses the follow schema to limit throughput
* <ul>
* <li>If flush pressure is greater than or equal to 1.0, no limitation.</li>
* <li>In normal case, the max throughput is tuned between
* {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND} and
* {@value #HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND}, using the formula &quot;lower +
* (upper - lower) * flushPressure&quot;, where flushPressure is in range [0.0, 1.0)</li>
* </ul>
* @see org.apache.hadoop.hbase.regionserver.HRegionServer#getFlushPressure()
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class PressureAwareFlushThroughputController extends PressureAwareThroughputController {
private static final Log LOG = LogFactory.getLog(PressureAwareFlushThroughputController.class);
public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND =
"hbase.hstore.flush.throughput.upper.bound";
private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND =
200L * 1024 * 1024;
public static final String HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND =
"hbase.hstore.flush.throughput.lower.bound";
private static final long DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND =
100L * 1024 * 1024;
public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD =
"hbase.hstore.flush.throughput.tune.period";
private static final int DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD = 20 * 1000;
// check flush throughput every this size
public static final String HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL =
"hbase.hstore.flush.throughput.control.check.interval";
private static final long DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL =
10L * 1024 * 1024;// 10MB
@Override
public void setup(final RegionServerServices server) {
server.getChoreService().scheduleChore(
new ScheduledChore("FlushThroughputTuner", this, tuningPeriod, this.tuningPeriod) {
@Override
protected void chore() {
tune(server.getFlushPressure());
}
});
}
private void tune(double flushPressure) {
double maxThroughputToSet;
if (flushPressure >= 1.0) {
// set to unlimited if global memstore size already exceeds lower limit
maxThroughputToSet = Double.MAX_VALUE;
} else {
// flushPressure is between 0.0 and 1.0, we use a simple linear formula to
// calculate the throughput limitation.
maxThroughputToSet =
maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
* flushPressure;
}
if (LOG.isDebugEnabled()) {
LOG.debug("flushPressure is " + flushPressure + ", tune flush throughput to "
+ throughputDesc(maxThroughputToSet));
}
this.setMaxThroughput(maxThroughputToSet);
}
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf == null) {
return;
}
this.maxThroughputUpperBound =
conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND,
DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND);
this.maxThroughputLowerBound =
conf.getLong(HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND,
DEFAULT_HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND);
this.offPeakHours = OffPeakHours.getInstance(conf);
this.controlPerSize =
conf.getLong(HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL,
DEFAULT_HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL);
this.setMaxThroughput(this.maxThroughputLowerBound);
this.tuningPeriod =
getConf().getInt(HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD,
DEFAULT_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD);
LOG.info("Flush throughput configurations, upper bound: "
+ throughputDesc(maxThroughputUpperBound) + ", lower bound "
+ throughputDesc(maxThroughputLowerBound) + ", tuning period: " + tuningPeriod + " ms");
}
@Override
public String toString() {
return "DefaultFlushController [maxThroughput=" + throughputDesc(getMaxThroughput())
+ ", activeFlushNumber=" + activeOperations.size() + "]";
}
@Override
protected boolean skipControl(long deltaSize, long controlSize) {
// for flush, we control the flow no matter whether the flush size is small
return false;
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public abstract class PressureAwareThroughputController extends Configured implements
ThroughputController, Stoppable {
private static final Log LOG = LogFactory.getLog(PressureAwareThroughputController.class);
/**
* Stores the information of one controlled compaction.
*/
private static final class ActiveOperation {
private final long startTime;
private long lastControlTime;
private long lastControlSize;
private long totalSize;
private long numberOfSleeps;
private long totalSleepTime;
// prevent too many debug log
private long lastLogTime;
ActiveOperation() {
long currentTime = EnvironmentEdgeManager.currentTime();
this.startTime = currentTime;
this.lastControlTime = currentTime;
this.lastLogTime = currentTime;
}
}
protected long maxThroughputUpperBound;
protected long maxThroughputLowerBound;
protected OffPeakHours offPeakHours;
protected long controlPerSize;
protected int tuningPeriod;
private volatile double maxThroughput;
protected final ConcurrentMap<String, ActiveOperation> activeOperations =
new ConcurrentHashMap<String, ActiveOperation>();
@Override
public abstract void setup(final RegionServerServices server);
protected String throughputDesc(long deltaSize, long elapsedTime) {
return throughputDesc((double) deltaSize / elapsedTime * 1000);
}
protected String throughputDesc(double speed) {
if (speed >= 1E15) { // large enough to say it is unlimited
return "unlimited";
} else {
return String.format("%.2f MB/sec", speed / 1024 / 1024);
}
}
@Override
public void start(String opName) {
activeOperations.put(opName, new ActiveOperation());
}
@Override
public long control(String opName, long size) throws InterruptedException {
ActiveOperation operation = activeOperations.get(opName);
operation.totalSize += size;
long deltaSize = operation.totalSize - operation.lastControlSize;
if (deltaSize < controlPerSize) {
return 0;
}
long now = EnvironmentEdgeManager.currentTime();
double maxThroughputPerCompaction = this.getMaxThroughput() / activeOperations.size();
long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
long elapsedTime = now - operation.lastControlTime;
operation.lastControlSize = operation.totalSize;
if (elapsedTime >= minTimeAllowed) {
operation.lastControlTime = EnvironmentEdgeManager.currentTime();
return 0;
}
// too fast
long sleepTime = minTimeAllowed - elapsedTime;
if (LOG.isDebugEnabled()) {
// do not log too much
if (now - operation.lastLogTime > 5L * 1000) {
LOG.debug("deltaSize: " + deltaSize + " bytes; elapseTime: " + elapsedTime + " ns");
LOG.debug(opName + " sleep " + sleepTime + " ms because current throughput is "
+ throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
+ throughputDesc(maxThroughputPerCompaction) + ", already slept "
+ operation.numberOfSleeps + " time(s) and total slept time is "
+ operation.totalSleepTime + " ms till now.");
operation.lastLogTime = now;
}
}
Thread.sleep(sleepTime);
operation.numberOfSleeps++;
operation.totalSleepTime += sleepTime;
operation.lastControlTime = EnvironmentEdgeManager.currentTime();
return sleepTime;
}
/**
* Check whether to skip control given delta size and control size
* @param deltaSize Delta size since last control
* @param controlSize Size limit to perform control
* @return a boolean indicates whether to skip this control
*/
protected abstract boolean skipControl(long deltaSize, long controlSize);
@Override
public void finish(String opName) {
ActiveOperation operation = activeOperations.remove(opName);
long elapsedTime = EnvironmentEdgeManager.currentTime() - operation.startTime;
LOG.info(opName + " average throughput is "
+ throughputDesc(operation.totalSize, elapsedTime) + ", slept "
+ operation.numberOfSleeps + " time(s) and total slept time is "
+ operation.totalSleepTime + " ms. " + activeOperations.size()
+ " active operations remaining, total limit is " + throughputDesc(getMaxThroughput()));
}
private volatile boolean stopped = false;
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
public double getMaxThroughput() {
return maxThroughput;
}
public void setMaxThroughput(double maxThroughput) {
this.maxThroughput = maxThroughput;
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.Store;
/**
* Helper methods for throttling
*/
@InterfaceAudience.Private
public final class ThroughputControlUtil {
private ThroughputControlUtil() {
}
private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
private static final String NAME_DELIMITER = "#";
/**
* Generate a name for throttling, to prevent name conflict when multiple IO operation running
* parallel on the same store.
* @param store the Store instance on which IO operation is happening
* @param opName Name of the IO operation, e.g. "flush", "compaction", etc.
* @return The name for throttling
*/
public static String getNameForThrottling(final Store store, final String opName) {
int counter;
for (;;) {
counter = NAME_COUNTER.get();
int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
if (NAME_COUNTER.compareAndSet(counter, next)) {
break;
}
}
return store.getRegionInfo().getRegionNameAsString() + NAME_DELIMITER
+ store.getFamily().getNameAsString() + NAME_DELIMITER + opName + NAME_DELIMITER + counter;
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
package org.apache.hadoop.hbase.regionserver.throttle;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Stoppable;
@ -23,11 +23,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* A utility that constrains the total throughput of one or more simultaneous flows (compactions) by
* A utility that constrains the total throughput of one or more simultaneous flows by
* sleeping when necessary.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public interface CompactionThroughputController extends Stoppable {
public interface ThroughputController extends Stoppable {
/**
* Setup controller for the given region server.
@ -35,18 +35,18 @@ public interface CompactionThroughputController extends Stoppable {
void setup(RegionServerServices server);
/**
* Start a compaction.
* Start the throughput controller.
*/
void start(String compactionName);
void start(String name);
/**
* Control the compaction throughput. Will sleep if too fast.
* Control the throughput. Will sleep if too fast.
* @return the actual sleep time.
*/
long control(String compactionName, long size) throws InterruptedException;
long control(String name, long size) throws InterruptedException;
/**
* Finish a compaction. Should call this method in a finally block.
* Finish the controller. Should call this method in a finally block.
*/
void finish(String compactionName);
void finish(String name);
}

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -309,4 +310,14 @@ public class MockRegionServerServices implements RegionServerServices {
public double getCompactionPressure() {
return 0;
}
@Override
public ThroughputController getFlushThroughputController() {
return null;
}
@Override
public double getFlushPressure() {
return 0;
}
}

View File

@ -44,10 +44,10 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WAL;
@ -121,7 +121,7 @@ public class TestIOFencing {
@Override
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException {
ThroughputController throughputController) throws IOException {
try {
return super.compact(compaction, store, throughputController);
} finally {
@ -131,7 +131,7 @@ public class TestIOFencing {
@Override
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
try {
return super.compact(compaction, store, throughputController, user);
} finally {

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
@ -62,10 +61,11 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -232,7 +232,7 @@ public class TestRegionObserverScannerOpenHook {
@Override
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController) throws IOException {
ThroughputController throughputController) throws IOException {
boolean ret = super.compact(compaction, store, throughputController);
if (ret) compactionStateChangeLatch.countDown();
return ret;
@ -240,7 +240,7 @@ public class TestRegionObserverScannerOpenHook {
@Override
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException {
ThroughputController throughputController, User user) throws IOException {
boolean ret = super.compact(compaction, store, throughputController, user);
if (ret) compactionStateChangeLatch.countDown();
return ret;

View File

@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@ -654,4 +655,14 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
public double getCompactionPressure() {
return 0;
}
@Override
public ThroughputController getFlushThroughputController() {
return null;
}
@Override
public double getFlushPressure() {
return 0;
}
}

View File

@ -22,8 +22,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;

View File

@ -60,9 +60,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -106,7 +106,7 @@ public class TestCompaction {
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName());
NoLimitThroughputController.class.getName());
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
secondRowBytes = START_KEY_BYTES.clone();
@ -365,13 +365,13 @@ public class TestCompaction {
}
@Override
public List<Path> compact(CompactionThroughputController throughputController)
public List<Path> compact(ThroughputController throughputController)
throws IOException {
return compact(throughputController, null);
}
@Override
public List<Path> compact(CompactionThroughputController throughputController, User user)
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
finishCompaction(this.selectedFiles);
return new ArrayList<Path>();
@ -423,13 +423,13 @@ public class TestCompaction {
}
@Override
public List<Path> compact(CompactionThroughputController throughputController)
public List<Path> compact(ThroughputController throughputController)
throws IOException {
return compact(throughputController, null);
}
@Override
public List<Path> compact(CompactionThroughputController throughputController, User user)
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
try {
isInCompact = true;
@ -512,10 +512,10 @@ public class TestCompaction {
HRegion r = mock(HRegion.class);
when(
r.compact(any(CompactionContext.class), any(Store.class),
any(CompactionThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
any(ThroughputController.class), any(User.class))).then(new Answer<Boolean>() {
public Boolean answer(InvocationOnMock invocation) throws Throwable {
invocation.getArgumentAt(0, CompactionContext.class).compact(
invocation.getArgumentAt(2, CompactionThroughputController.class));
invocation.getArgumentAt(2, ThroughputController.class));
return true;
}
});

View File

@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -314,7 +314,7 @@ public class TestHRegionReplayEvents {
// compaction from primary
LOG.info("-- Compacting primary, only 1 store");
primaryRegion.compactStore(Bytes.toBytes("cf1"),
NoLimitCompactionThroughputController.INSTANCE);
NoLimitThroughputController.INSTANCE);
// now replay the edits and the flush marker
reader = createWALReaderForPrimary();

View File

@ -88,8 +88,7 @@ import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -372,7 +371,7 @@ public class TestSplitTransactionOnCluster {
region.initialize();
// 2, Run Compaction cc
assertFalse(region.compact(cc, store, NoLimitCompactionThroughputController.INSTANCE));
assertFalse(region.compact(cc, store, NoLimitThroughputController.INSTANCE));
assertTrue(fileNum > store.getStorefiles().size());
// 3, Split

View File

@ -69,7 +69,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.security.User;
@ -379,7 +379,7 @@ public class TestStore {
Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
// after compact; check the lowest time stamp
store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE);
store.compact(store.requestCompaction(), NoLimitThroughputController.INSTANCE);
lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);

View File

@ -51,8 +51,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
@ -131,7 +131,7 @@ public class TestStripeCompactor {
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
NoLimitCompactionThroughputController.INSTANCE);
NoLimitThroughputController.INSTANCE);
writers.verifyKvs(output, allFiles, true);
if (allFiles) {
assertEquals(output.length, paths.size());
@ -168,7 +168,7 @@ public class TestStripeCompactor {
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
NoLimitCompactionThroughputController.INSTANCE);
NoLimitThroughputController.INSTANCE);
assertEquals(output.length, paths.size());
writers.verifyKvs(output, true, true);
List<byte[]> boundaries = new ArrayList<byte[]>();

View File

@ -37,10 +37,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
@ -76,7 +76,7 @@ public class TestStripeStoreEngine {
when(
mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
any(byte[].class), any(byte[].class), any(byte[].class),
any(CompactionThroughputController.class), any(User.class)))
any(ThroughputController.class), any(User.class)))
.thenReturn(new ArrayList<Path>());
// Produce 3 L0 files.
@ -95,10 +95,10 @@ public class TestStripeStoreEngine {
assertEquals(2, compaction.getRequest().getFiles().size());
assertFalse(compaction.getRequest().getFiles().contains(sf));
// Make sure the correct method it called on compactor.
compaction.compact(NoLimitCompactionThroughputController.INSTANCE);
compaction.compact(NoLimitThroughputController.INSTANCE);
verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
NoLimitCompactionThroughputController.INSTANCE, null);
NoLimitThroughputController.INSTANCE, null);
}
private static StoreFile createFile() throws Exception {

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -212,10 +213,10 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
any(NoLimitCompactionThroughputController.class), any(User.class));
any(NoLimitThroughputController.class), any(User.class));
}
@Test
@ -457,7 +458,7 @@ public class TestStripeCompactionPolicy {
// All the Stripes are expired, so the Compactor will not create any Writers. We need to create
// an empty file to preserve metadata
StripeCompactor sc = createCompactor();
List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
List<Path> paths = scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
assertEquals(1, paths.size());
}
@ -516,7 +517,7 @@ public class TestStripeCompactionPolicy {
assertTrue(policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>() {
@Override
public boolean matches(Object argument) {
@ -530,7 +531,7 @@ public class TestStripeCompactionPolicy {
}
}), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
any(NoLimitCompactionThroughputController.class), any(User.class));
any(NoLimitThroughputController.class), any(User.class));
}
/**
@ -551,12 +552,12 @@ public class TestStripeCompactionPolicy {
assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null);
scr.execute(sc, NoLimitThroughputController.INSTANCE, null);
verify(sc, times(1)).compact(eq(scr.getRequest()),
count == null ? anyInt() : eq(count.intValue()),
size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
any(NoLimitCompactionThroughputController.class), any(User.class));
any(NoLimitThroughputController.class), any(User.class));
}
/** Verify arbitrary flush. */

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
package org.apache.hadoop.hbase.regionserver.throttle;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -32,14 +32,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region;
@ -47,13 +45,18 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
@Category({ RegionServerTests.class, MediumTests.class })
public class TestCompactionWithThroughputController {
private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
@ -81,18 +84,18 @@ public class TestCompactionWithThroughputController {
}
private Store prepareData() throws IOException {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
Admin admin = TEST_UTIL.getHBaseAdmin();
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTable table = TEST_UTIL.createTable(tableName, family);
Table table = TEST_UTIL.createTable(tableName, family);
Random rand = new Random();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[128 * 1024];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).add(family, qualifier, value));
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
admin.flush(tableName);
}
@ -107,10 +110,12 @@ public class TestCompactionWithThroughputController {
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
conf.setLong(
PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
PressureAwareCompactionThroughputController
.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
throughputLimit);
conf.setLong(
PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
PressureAwareCompactionThroughputController
.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
throughputLimit);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
PressureAwareCompactionThroughputController.class.getName());
@ -142,7 +147,7 @@ public class TestCompactionWithThroughputController {
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName());
NoLimitThroughputController.class.getName());
TEST_UTIL.startMiniCluster(1);
try {
Store store = prepareData();
@ -177,10 +182,12 @@ public class TestCompactionWithThroughputController {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
conf.setLong(
PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
PressureAwareCompactionThroughputController
.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
20L * 1024 * 1024);
conf.setLong(
PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
PressureAwareCompactionThroughputController
.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
10L * 1024 * 1024);
conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
@ -201,30 +208,34 @@ public class TestCompactionWithThroughputController {
PressureAwareCompactionThroughputController throughputController =
(PressureAwareCompactionThroughputController) regionServer.compactSplitThread
.getCompactionThroughputController();
assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
Table table = conn.getTable(tableName);
for (int i = 0; i < 5; i++) {
table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
byte[] value = new byte[0];
table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value));
TEST_UTIL.flush(tableName);
}
Thread.sleep(2000);
assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
byte[] value1 = new byte[0];
table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1));
TEST_UTIL.flush(tableName);
Thread.sleep(2000);
assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
byte[] value = new byte[0];
table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value));
TEST_UTIL.flush(tableName);
Thread.sleep(2000);
assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON);
assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON);
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitCompactionThroughputController.class.getName());
NoLimitThroughputController.class.getName());
regionServer.compactSplitThread.onConfigurationChange(conf);
assertTrue(throughputController.isStopped());
assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() instanceof NoLimitCompactionThroughputController);
assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
instanceof NoLimitThroughputController);
} finally {
conn.close();
TEST_UTIL.shutdownMiniCluster();
@ -255,27 +266,35 @@ public class TestCompactionWithThroughputController {
assertEquals(0.0, store.getCompactionPressure(), EPSILON);
Table table = conn.getTable(tableName);
for (int i = 0; i < 4; i++) {
table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0]));
byte[] value1 = new byte[0];
table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1));
byte[] value = new byte[0];
table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value));
TEST_UTIL.flush(tableName);
}
assertEquals(8, store.getStorefilesCount());
assertEquals(0.0, store.getCompactionPressure(), EPSILON);
table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0]));
table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0]));
byte[] value5 = new byte[0];
table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5));
byte[] value4 = new byte[0];
table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4));
TEST_UTIL.flush(tableName);
assertEquals(10, store.getStorefilesCount());
assertEquals(0.5, store.getCompactionPressure(), EPSILON);
table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0]));
byte[] value3 = new byte[0];
table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3));
byte[] value2 = new byte[0];
table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2));
TEST_UTIL.flush(tableName);
assertEquals(12, store.getStorefilesCount());
assertEquals(1.0, store.getCompactionPressure(), EPSILON);
table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0]));
byte[] value1 = new byte[0];
table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1));
byte[] value = new byte[0];
table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value));
TEST_UTIL.flush(tableName);
assertEquals(14, store.getStorefilesCount());
assertEquals(2.0, store.getCompactionPressure(), EPSILON);

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreEngine;
import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestFlushWithThroughputController {
private static final Log LOG = LogFactory.getLog(TestFlushWithThroughputController.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final double EPSILON = 1E-6;
private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
private final byte[] family = Bytes.toBytes("f");
private final byte[] qualifier = Bytes.toBytes("q");
private Store getStoreWithName(TableName tableName) {
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
HRegionServer hrs = rsts.get(i).getRegionServer();
for (Region region : hrs.getOnlineRegions(tableName)) {
return region.getStores().iterator().next();
}
}
return null;
}
private Store generateAndFlushData() throws IOException {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTable table = TEST_UTIL.createTable(tableName, family);
Random rand = new Random();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[256 * 1024];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
admin.flush(tableName);
}
return getStoreWithName(tableName);
}
private long testFlushWithThroughputLimit() throws Exception {
long throughputLimit = 1L * 1024 * 1024;
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
PressureAwareFlushThroughputController.class.getName());
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND,
throughputLimit);
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND,
throughputLimit);
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL,
throughputLimit);
TEST_UTIL.startMiniCluster(1);
try {
long startTime = System.nanoTime();
Store store = generateAndFlushData();
assertEquals(10, store.getStorefilesCount());
long duration = System.nanoTime() - startTime;
double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000;
LOG.debug("Throughput is: " + (throughput / 1024 / 1024) + " MB/s");
// confirm that the speed limit work properly(not too fast, and also not too slow)
// 20% is the max acceptable error rate.
assertTrue(throughput < throughputLimit * 1.2);
assertTrue(throughput > throughputLimit * 0.8);
return duration;
} finally {
TEST_UTIL.shutdownMiniCluster();
}
}
private long testFlushWithoutThroughputLimit() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
TEST_UTIL.startMiniCluster(1);
try {
long startTime = System.nanoTime();
Store store = generateAndFlushData();
assertEquals(10, store.getStorefilesCount());
long duration = System.nanoTime() - startTime;
double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000;
LOG.debug("Throughput w/o limit is: " + (throughput / 1024 / 1024) + " MB/s");
return duration;
} finally {
TEST_UTIL.shutdownMiniCluster();
}
}
@Test
public void testFlushControl() throws Exception {
long limitTime = testFlushWithThroughputLimit();
long noLimitTime = testFlushWithoutThroughputLimit();
LOG.info("With 1M/s limit, flush use " + (limitTime / 1000000)
+ "ms; without limit, flush use " + (noLimitTime / 1000000) + "ms");
// Commonly if multiple region flush at the same time, the throughput could be very high
// but flush in this test is in serial, so we use a weak assumption.
assertTrue(limitTime > 2 * noLimitTime);
}
/**
* Test the tuning task of {@link PressureAwareFlushThroughputController}
*/
@Test
public void testFlushThroughputTuning() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND,
20L * 1024 * 1024);
conf.setLong(
PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND,
10L * 1024 * 1024);
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
PressureAwareFlushThroughputController.class.getName());
conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD,
3000);
TEST_UTIL.startMiniCluster(1);
assertEquals(10L * 1024 * 1024,
((PressureAwareThroughputController) TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.getFlushThroughputController()).getMaxThroughput(), EPSILON);
Connection conn = ConnectionFactory.createConnection(conf);
try {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(family));
htd.setCompactionEnabled(false);
TEST_UTIL.getHBaseAdmin().createTable(htd);
TEST_UTIL.waitTableAvailable(tableName);
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
PressureAwareFlushThroughputController throughputController =
(PressureAwareFlushThroughputController) regionServer.getFlushThroughputController();
Table table = conn.getTable(tableName);
Random rand = new Random();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[256 * 1024];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
}
Thread.sleep(5000);
double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure());
assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON);
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
regionServer.onConfigurationChange(conf);
assertTrue(throughputController.isStopped());
assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController);
} finally {
conn.close();
TEST_UTIL.shutdownMiniCluster();
}
}
/**
* Test the logic for striped store.
*/
@Test
public void testFlushControlForStripedStore() throws Exception {
TEST_UTIL.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY,
StripeStoreEngine.class.getName());
testFlushControl();
}
}

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -66,8 +65,21 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.*;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -647,11 +659,11 @@ public class TestWALReplay {
}
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status) throws IOException {
MonitoredTask status, ThroughputController throughputController) throws IOException {
if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests");
}
return super.flushSnapshot(snapshot, cacheFlushId, status);
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController);
}
};