HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write handler exhausted
Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
parent
31978c31bb
commit
98ac4f12b5
|
@ -89,6 +89,7 @@ public final class HConstants {
|
|||
NOT_RUN,
|
||||
SUCCESS,
|
||||
BAD_FAMILY,
|
||||
STORE_TOO_BUSY,
|
||||
SANITY_CHECK_FAILURE,
|
||||
FAILURE
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -146,6 +146,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
@ -674,6 +675,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
|
||||
private final StoreHotnessProtector storeHotnessProtector;
|
||||
|
||||
/**
|
||||
* HRegion constructor. This constructor should only be used for testing and
|
||||
* extensions. Instances of HRegion should be instantiated with the
|
||||
|
@ -794,6 +797,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||
this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
|
||||
DEFAULT_DURABILITY : htd.getDurability();
|
||||
|
||||
this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
|
||||
|
||||
if (rsServices != null) {
|
||||
this.rsAccounting = this.rsServices.getRegionServerAccounting();
|
||||
// don't initialize coprocessors if not running within a regionserver
|
||||
|
@ -806,8 +812,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
this.metricsRegion = null;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// Write out region name as string and its encoded name.
|
||||
LOG.debug("Instantiated " + this);
|
||||
// Write out region name, its encoded name and storeHotnessProtector as string.
|
||||
LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString());
|
||||
}
|
||||
|
||||
configurationManager = Optional.empty();
|
||||
|
@ -3180,9 +3186,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
if (!isOperationPending(lastIndexExclusive)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// HBASE-19389 Limit concurrency of put with dense (hundreds) columns to avoid exhausting
|
||||
// RS handlers, covering both MutationBatchOperation and ReplayBatchOperation
|
||||
// The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in checkAndPrepare phase and won't
|
||||
// pass the isOperationPending check
|
||||
Map<byte[], List<Cell>> curFamilyCellMap =
|
||||
getMutation(lastIndexExclusive).getFamilyCellMap();
|
||||
try {
|
||||
// start the protector before acquiring row lock considering performance, and will finish
|
||||
// it when encountering exception
|
||||
region.storeHotnessProtector.start(curFamilyCellMap);
|
||||
} catch (RegionTooBusyException rtbe) {
|
||||
region.storeHotnessProtector.finish(curFamilyCellMap);
|
||||
if (isAtomic()) {
|
||||
throw rtbe;
|
||||
}
|
||||
retCodeDetails[lastIndexExclusive] =
|
||||
new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage());
|
||||
continue;
|
||||
}
|
||||
|
||||
Mutation mutation = getMutation(lastIndexExclusive);
|
||||
// If we haven't got any rows in our batch, we should block to get the next one.
|
||||
RowLock rowLock = null;
|
||||
boolean throwException = false;
|
||||
try {
|
||||
// if atomic then get exclusive lock, else shared lock
|
||||
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
|
||||
|
@ -3190,16 +3218,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// NOTE: We will retry when other exceptions, but we should stop if we receive
|
||||
// TimeoutIOException or InterruptedIOException as operation has timed out or
|
||||
// interrupted respectively.
|
||||
throwException = true;
|
||||
throw e;
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
|
||||
if (isAtomic()) { // fail, atomic means all or none
|
||||
throwException = true;
|
||||
throw ioe;
|
||||
}
|
||||
} catch (Throwable throwable) {
|
||||
throwException = true;
|
||||
throw throwable;
|
||||
} finally {
|
||||
if (throwException) {
|
||||
region.storeHotnessProtector.finish(curFamilyCellMap);
|
||||
}
|
||||
}
|
||||
if (rowLock == null) {
|
||||
// We failed to grab another lock
|
||||
if (isAtomic()) {
|
||||
region.storeHotnessProtector.finish(curFamilyCellMap);
|
||||
throw new IOException("Can't apply all operations atomically!");
|
||||
}
|
||||
break; // Stop acquiring more rows for this batch
|
||||
|
@ -3285,7 +3323,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
public void doPostOpCleanupForMiniBatch(
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,
|
||||
boolean success) throws IOException {}
|
||||
boolean success) throws IOException {
|
||||
doFinishHotnessProtector(miniBatchOp);
|
||||
}
|
||||
|
||||
private void doFinishHotnessProtector(
|
||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
|
||||
// check and return if the protector is not enabled
|
||||
if (!region.storeHotnessProtector.isEnable()) {
|
||||
return;
|
||||
}
|
||||
// miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch throwing exception.
|
||||
// This case was handled.
|
||||
if (miniBatchOp == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive();
|
||||
|
||||
for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) {
|
||||
switch (retCodeDetails[i].getOperationStatusCode()) {
|
||||
case SUCCESS:
|
||||
case FAILURE:
|
||||
region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap());
|
||||
break;
|
||||
default:
|
||||
// do nothing
|
||||
// We won't start the protector for NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the
|
||||
// STORE_TOO_BUSY case is handled in StoreHotnessProtector#start
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically apply the given map of family->edits to the memstore.
|
||||
|
@ -3504,6 +3573,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
@Override
|
||||
public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
|
||||
final WALEdit walEdit, boolean success) throws IOException {
|
||||
|
||||
super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success);
|
||||
if (miniBatchOp != null) {
|
||||
// synced so that the coprocessor contract is adhered to.
|
||||
if (region.coprocessorHost != null) {
|
||||
|
@ -4097,6 +4168,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
|
||||
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
|
||||
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
|
||||
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) {
|
||||
throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7900,7 +7973,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
|
||||
51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
|
||||
(14 * Bytes.SIZEOF_LONG) +
|
||||
3 * Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
|
@ -7927,6 +8000,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
+ 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
|
||||
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
|
||||
+ ClassSize.STORE_SERVICES // store services
|
||||
+ StoreHotnessProtector.FIXED_SIZE
|
||||
;
|
||||
|
||||
@Override
|
||||
|
@ -8391,7 +8465,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
*/
|
||||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
// Do nothing for now.
|
||||
this.storeHotnessProtector.update(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorCompletionService;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -181,6 +182,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
|
||||
private final boolean verifyBulkLoads;
|
||||
|
||||
private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
|
||||
private final int parallelPutCountPrintThreshold;
|
||||
|
||||
private ScanInfo scanInfo;
|
||||
|
||||
// All access must be synchronized.
|
||||
|
@ -295,7 +299,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
|
||||
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
|
||||
}
|
||||
LOG.debug("Memstore type={}", className);
|
||||
this.offPeakHours = OffPeakHours.getInstance(conf);
|
||||
|
||||
// Setting up cache configuration for this family
|
||||
|
@ -334,6 +337,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
+ flushRetriesNumber);
|
||||
}
|
||||
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
|
||||
|
||||
int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
|
||||
if (confPrintThreshold < 10) {
|
||||
confPrintThreshold = 10;
|
||||
}
|
||||
this.parallelPutCountPrintThreshold = confPrintThreshold;
|
||||
LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold="
|
||||
+ parallelPutCountPrintThreshold);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -697,9 +708,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
public void add(final Cell cell, MemStoreSizing memstoreSizing) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
this.memstore.add(cell, memstoreSizing);
|
||||
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
|
||||
.getColumnFamilyName() + " too Busy!");
|
||||
}
|
||||
}
|
||||
this.memstore.add(cell, memstoreSizing);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
currentParallelPutCount.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -709,9 +727,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
|
||||
.getColumnFamilyName() + " too Busy!");
|
||||
}
|
||||
}
|
||||
memstore.add(cells, memstoreSizing);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
currentParallelPutCount.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2368,8 +2393,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
}
|
||||
|
||||
public static final long FIXED_OVERHEAD =
|
||||
ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
|
||||
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
||||
ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
|
||||
+ (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
|
||||
|
@ -2685,4 +2710,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
}
|
||||
}
|
||||
|
||||
public int getCurrentParallelPutCount() {
|
||||
return currentParallelPutCount.get();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -1039,6 +1040,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
builder.addResultOrException(getResultOrException(
|
||||
ClientProtos.Result.getDefaultInstance(), index));
|
||||
break;
|
||||
|
||||
case STORE_TOO_BUSY:
|
||||
e = new RegionTooBusyException(codes[i].getExceptionMsg());
|
||||
builder.addResultOrException(getResultOrException(e, index));
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -280,4 +280,6 @@ public interface Store {
|
|||
* @return true if the memstore may need some extra memory space
|
||||
*/
|
||||
boolean isSloppyMemStore();
|
||||
|
||||
int getCurrentParallelPutCount();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.throttle;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
/**
|
||||
* StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
|
||||
* does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
|
||||
* dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
|
||||
* degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
|
||||
* while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
|
||||
* <p>
|
||||
* There are three key parameters:
|
||||
* <p>
|
||||
* 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
|
||||
* threshold, the HotProtector will work, 100 by default
|
||||
* <p>
|
||||
* 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
|
||||
* the same time.
|
||||
* <p>
|
||||
* 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to
|
||||
* prepare writing puts to a Store at the same time.
|
||||
* <p>
|
||||
* Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
|
||||
* WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
|
||||
* enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
|
||||
* slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
|
||||
* <p>
|
||||
* This protector is enabled by default and could be turned off by setting
|
||||
* hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StoreHotnessProtector {
|
||||
private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class);
|
||||
private volatile int parallelPutToStoreThreadLimit;
|
||||
|
||||
private volatile int parallelPreparePutToStoreThreadLimit;
|
||||
public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
|
||||
"hbase.region.store.parallel.put.limit";
|
||||
public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
|
||||
"hbase.region.store.parallel.prepare.put.multiplier";
|
||||
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
|
||||
private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
|
||||
public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
|
||||
"hbase.region.store.parallel.put.limit.min.column.count";
|
||||
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
|
||||
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
|
||||
|
||||
private final Map<byte[], AtomicInteger> preparePutToStoreMap =
|
||||
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
|
||||
private final Region region;
|
||||
|
||||
public StoreHotnessProtector(Region region, Configuration conf) {
|
||||
init(conf);
|
||||
this.region = region;
|
||||
}
|
||||
|
||||
public void init(Configuration conf) {
|
||||
this.parallelPutToStoreThreadLimit =
|
||||
conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
|
||||
this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
|
||||
DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
|
||||
this.parallelPutToStoreThreadLimitCheckMinColumnCount =
|
||||
conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
|
||||
DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
|
||||
|
||||
}
|
||||
|
||||
public void update(Configuration conf) {
|
||||
init(conf);
|
||||
preparePutToStoreMap.clear();
|
||||
LOG.debug("update config: " + toString());
|
||||
}
|
||||
|
||||
public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
|
||||
if (!isEnable()) {
|
||||
return;
|
||||
}
|
||||
|
||||
String tooBusyStore = null;
|
||||
|
||||
for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
|
||||
Store store = this.region.getStore(e.getKey());
|
||||
if (store == null || e.getValue() == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
|
||||
|
||||
//we need to try to add #preparePutCount at first because preparePutToStoreMap will be
|
||||
//cleared when changing the configuration.
|
||||
preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
|
||||
AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
|
||||
if (preparePutCounter == null) {
|
||||
preparePutCounter = new AtomicInteger();
|
||||
preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
|
||||
}
|
||||
int preparePutCount = preparePutCounter.incrementAndGet();
|
||||
if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
|
||||
|| preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
|
||||
tooBusyStore = (tooBusyStore == null ?
|
||||
store.getColumnFamilyName() :
|
||||
tooBusyStore + "," + store.getColumnFamilyName());
|
||||
}
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
|
||||
+ "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (tooBusyStore != null) {
|
||||
String msg =
|
||||
"StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
|
||||
+ " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(msg);
|
||||
}
|
||||
throw new RegionTooBusyException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public void finish(Map<byte[], List<Cell>> familyMaps) {
|
||||
if (!isEnable()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
|
||||
Store store = this.region.getStore(e.getKey());
|
||||
if (store == null || e.getValue() == null) {
|
||||
continue;
|
||||
}
|
||||
if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
|
||||
AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
|
||||
// preparePutToStoreMap will be cleared when changing the configuration, so it may turn
|
||||
// into a negative value. It will be not accuracy in a short time, it's a trade-off for
|
||||
// performance.
|
||||
if (counter != null && counter.decrementAndGet() < 0) {
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
|
||||
+ this.parallelPutToStoreThreadLimit + " ; minColumnNum="
|
||||
+ this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
|
||||
+ this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ?
|
||||
"enable" :
|
||||
"disable");
|
||||
}
|
||||
|
||||
public boolean isEnable() {
|
||||
// feature is enabled when parallelPutToStoreThreadLimit > 0
|
||||
return this.parallelPutToStoreThreadLimit > 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
|
||||
return preparePutToStoreMap;
|
||||
}
|
||||
|
||||
public static final long FIXED_SIZE =
|
||||
ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
|
||||
}
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment;
|
|||
import org.apache.hadoop.hbase.regionserver.Segment;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -476,6 +477,14 @@ public class TestHeapSize {
|
|||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
cl = StoreHotnessProtector.class;
|
||||
actual = StoreHotnessProtector.FIXED_SIZE;
|
||||
expected = ClassSize.estimateBase(cl, false);
|
||||
if (expected != actual) {
|
||||
ClassSize.estimateBase(cl, true);
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
// Block cache key overhead. Only tests fixed overhead as estimating heap
|
||||
// size of strings is hard.
|
||||
cl = BlockCacheKey.class;
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.throttle;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER;
|
||||
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT;
|
||||
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestStoreHotnessProtector {
|
||||
|
||||
@ClassRule public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestStoreHotnessProtector.class);
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPreparePutCounter() throws Exception {
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0);
|
||||
conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10);
|
||||
conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
|
||||
Region mockRegion = mock(Region.class);
|
||||
StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf);
|
||||
|
||||
Store mockStore1 = mock(Store.class);
|
||||
RegionInfo mockRegionInfo = mock(RegionInfo.class);
|
||||
byte[] family = "testF1".getBytes();
|
||||
|
||||
when(mockRegion.getStore(family)).thenReturn(mockStore1);
|
||||
when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
|
||||
when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1");
|
||||
|
||||
when(mockStore1.getCurrentParallelPutCount()).thenReturn(1);
|
||||
when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1");
|
||||
|
||||
final Map<byte[], List<Cell>> familyMaps = new HashMap<>();
|
||||
familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class)));
|
||||
|
||||
final AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
// PreparePutCounter not access limit
|
||||
|
||||
int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf
|
||||
.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
|
||||
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
executorService.execute(() -> {
|
||||
try {
|
||||
storeHotnessProtector.start(familyMaps);
|
||||
} catch (RegionTooBusyException e) {
|
||||
e.printStackTrace();
|
||||
exception.set(e);
|
||||
} finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
countDownLatch.await(60, TimeUnit.SECONDS);
|
||||
//no exception
|
||||
Assert.assertEquals(exception.get(), null);
|
||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
|
||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
|
||||
threadCount);
|
||||
|
||||
// access limit
|
||||
|
||||
try {
|
||||
storeHotnessProtector.start(familyMaps);
|
||||
} catch (RegionTooBusyException e) {
|
||||
e.printStackTrace();
|
||||
exception.set(e);
|
||||
}
|
||||
|
||||
Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class);
|
||||
|
||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
|
||||
// when access limit, counter will not changed.
|
||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
|
||||
threadCount + 1);
|
||||
|
||||
storeHotnessProtector.finish(familyMaps);
|
||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
|
||||
threadCount);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue