HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write handler exhausted

Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
Chance Li 2018-03-14 18:18:50 +08:00 committed by Yu Li
parent c8ecfc5461
commit 47b227b55b
8 changed files with 458 additions and 11 deletions

View File

@ -89,6 +89,7 @@ public final class HConstants {
NOT_RUN,
SUCCESS,
BAD_FAMILY,
STORE_TOO_BUSY,
SANITY_CHECK_FAILURE,
FAILURE
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
@ -673,6 +674,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
private final StoreHotnessProtector storeHotnessProtector;
/**
* HRegion constructor. This constructor should only be used for testing and
* extensions. Instances of HRegion should be instantiated with the
@ -793,6 +796,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
DEFAULT_DURABILITY : htd.getDurability();
this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
@ -805,8 +811,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.metricsRegion = null;
}
if (LOG.isDebugEnabled()) {
// Write out region name as string and its encoded name.
LOG.debug("Instantiated " + this);
// Write out region name, its encoded name and storeHotnessProtector as string.
LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString());
}
configurationManager = Optional.empty();
@ -3169,9 +3175,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!isOperationPending(lastIndexExclusive)) {
continue;
}
// HBASE-19389 Limit concurrency of put with dense (hundreds) columns to avoid exhausting
// RS handlers, covering both MutationBatchOperation and ReplayBatchOperation
// The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in checkAndPrepare phase and won't
// pass the isOperationPending check
Map<byte[], List<Cell>> curFamilyCellMap =
getMutation(lastIndexExclusive).getFamilyCellMap();
try {
// start the protector before acquiring row lock considering performance, and will finish
// it when encountering exception
region.storeHotnessProtector.start(curFamilyCellMap);
} catch (RegionTooBusyException rtbe) {
region.storeHotnessProtector.finish(curFamilyCellMap);
if (isAtomic()) {
throw rtbe;
}
retCodeDetails[lastIndexExclusive] =
new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage());
continue;
}
Mutation mutation = getMutation(lastIndexExclusive);
// If we haven't got any rows in our batch, we should block to get the next one.
RowLock rowLock = null;
boolean throwException = false;
try {
// if atomic then get exclusive lock, else shared lock
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
@ -3179,16 +3207,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// NOTE: We will retry when other exceptions, but we should stop if we receive
// TimeoutIOException or InterruptedIOException as operation has timed out or
// interrupted respectively.
throwException = true;
throw e;
} catch (IOException ioe) {
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
if (isAtomic()) { // fail, atomic means all or none
throwException = true;
throw ioe;
}
} catch (Throwable throwable) {
throwException = true;
throw throwable;
} finally {
if (throwException) {
region.storeHotnessProtector.finish(curFamilyCellMap);
}
}
if (rowLock == null) {
// We failed to grab another lock
if (isAtomic()) {
region.storeHotnessProtector.finish(curFamilyCellMap);
throw new IOException("Can't apply all operations atomically!");
}
break; // Stop acquiring more rows for this batch
@ -3274,7 +3312,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public void doPostOpCleanupForMiniBatch(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,
boolean success) throws IOException {}
boolean success) throws IOException {
doFinishHotnessProtector(miniBatchOp);
}
private void doFinishHotnessProtector(
final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
// check and return if the protector is not enabled
if (!region.storeHotnessProtector.isEnable()) {
return;
}
// miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch throwing exception.
// This case was handled.
if (miniBatchOp == null) {
return;
}
final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive();
for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) {
switch (retCodeDetails[i].getOperationStatusCode()) {
case SUCCESS:
case FAILURE:
region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap());
break;
default:
// do nothing
// We won't start the protector for NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the
// STORE_TOO_BUSY case is handled in StoreHotnessProtector#start
break;
}
}
}
/**
* Atomically apply the given map of family->edits to the memstore.
@ -3493,6 +3562,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
final WALEdit walEdit, boolean success) throws IOException {
super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success);
if (miniBatchOp != null) {
// synced so that the coprocessor contract is adhered to.
if (region.coprocessorHost != null) {
@ -4086,6 +4157,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) {
throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
}
}
@ -7889,7 +7962,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
3 * Bytes.SIZEOF_BOOLEAN);
@ -7916,6 +7989,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
+ ClassSize.STORE_SERVICES // store services
+ StoreHotnessProtector.FIXED_SIZE
;
@Override
@ -8380,7 +8454,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
@Override
public void onConfigurationChange(Configuration conf) {
// Do nothing for now.
this.storeHotnessProtector.update(conf);
}
/**

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -178,6 +179,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
private final boolean verifyBulkLoads;
private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
private final int parallelPutCountPrintThreshold;
private ScanInfo scanInfo;
// All access must be synchronized.
@ -292,7 +296,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
}
LOG.debug("Memstore type={}", className);
this.offPeakHours = OffPeakHours.getInstance(conf);
// Setting up cache configuration for this family
@ -331,6 +334,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
+ flushRetriesNumber);
}
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
if (confPrintThreshold < 10) {
confPrintThreshold = 10;
}
this.parallelPutCountPrintThreshold = confPrintThreshold;
LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold="
+ parallelPutCountPrintThreshold);
}
/**
@ -694,9 +705,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
public void add(final Cell cell, MemStoreSizing memstoreSizing) {
lock.readLock().lock();
try {
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
if (LOG.isTraceEnabled()) {
LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
.getColumnFamilyName() + " too Busy!");
}
}
this.memstore.add(cell, memstoreSizing);
} finally {
lock.readLock().unlock();
currentParallelPutCount.decrementAndGet();
}
}
@ -706,9 +724,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
lock.readLock().lock();
try {
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
if (LOG.isTraceEnabled()) {
LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
.getColumnFamilyName() + " too Busy!");
}
}
memstore.add(cells, memstoreSizing);
} finally {
lock.readLock().unlock();
currentParallelPutCount.decrementAndGet();
}
}
@ -2326,8 +2351,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
public static final long FIXED_OVERHEAD =
ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
+ (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
@ -2576,4 +2601,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
lock.writeLock().unlock();
}
}
public int getCurrentParallelPutCount() {
return currentParallelPutCount.get();
}
}

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -1039,6 +1040,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.addResultOrException(getResultOrException(
ClientProtos.Result.getDefaultInstance(), index));
break;
case STORE_TOO_BUSY:
e = new RegionTooBusyException(codes[i].getExceptionMsg());
builder.addResultOrException(getResultOrException(e, index));
break;
}
}
} finally {

View File

@ -280,4 +280,6 @@ public interface Store {
* @return true if the memstore may need some extra memory space
*/
boolean isSloppyMemStore();
int getCurrentParallelPutCount();
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
* does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
* dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
* degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
* while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
* <p>
* There are three key parameters:
* <p>
* 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
* threshold, the HotProtector will work, 100 by default
* <p>
* 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
* the same time.
* <p>
* 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to
* prepare writing puts to a Store at the same time.
* <p>
* Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
* WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
* enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
* slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
* <p>
* This protector is enabled by default and could be turned off by setting
* hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
*/
@InterfaceAudience.Private
public class StoreHotnessProtector {
private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class);
private volatile int parallelPutToStoreThreadLimit;
private volatile int parallelPreparePutToStoreThreadLimit;
public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
"hbase.region.store.parallel.put.limit";
public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
"hbase.region.store.parallel.prepare.put.multiplier";
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
"hbase.region.store.parallel.put.limit.min.column.count";
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
private final Map<byte[], AtomicInteger> preparePutToStoreMap =
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
private final Region region;
public StoreHotnessProtector(Region region, Configuration conf) {
init(conf);
this.region = region;
}
public void init(Configuration conf) {
this.parallelPutToStoreThreadLimit =
conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
this.parallelPutToStoreThreadLimitCheckMinColumnCount =
conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
}
public void update(Configuration conf) {
init(conf);
preparePutToStoreMap.clear();
LOG.debug("update config: " + toString());
}
public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
if (!isEnable()) {
return;
}
String tooBusyStore = null;
for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
Store store = this.region.getStore(e.getKey());
if (store == null || e.getValue() == null) {
continue;
}
if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
//we need to try to add #preparePutCount at first because preparePutToStoreMap will be
//cleared when changing the configuration.
preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
if (preparePutCounter == null) {
preparePutCounter = new AtomicInteger();
preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
}
int preparePutCount = preparePutCounter.incrementAndGet();
if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
|| preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
tooBusyStore = (tooBusyStore == null ?
store.getColumnFamilyName() :
tooBusyStore + "," + store.getColumnFamilyName());
}
if (LOG.isTraceEnabled()) {
LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
+ "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
}
}
}
if (tooBusyStore != null) {
String msg =
"StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
+ " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(msg);
}
throw new RegionTooBusyException(msg);
}
}
public void finish(Map<byte[], List<Cell>> familyMaps) {
if (!isEnable()) {
return;
}
for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
Store store = this.region.getStore(e.getKey());
if (store == null || e.getValue() == null) {
continue;
}
if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
// preparePutToStoreMap will be cleared when changing the configuration, so it may turn
// into a negative value. It will be not accuracy in a short time, it's a trade-off for
// performance.
if (counter != null && counter.decrementAndGet() < 0) {
counter.incrementAndGet();
}
}
}
}
public String toString() {
return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
+ this.parallelPutToStoreThreadLimit + " ; minColumnNum="
+ this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
+ this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ?
"enable" :
"disable");
}
public boolean isEnable() {
// feature is enabled when parallelPutToStoreThreadLimit > 0
return this.parallelPutToStoreThreadLimit > 0;
}
@VisibleForTesting
Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
return preparePutToStoreMap;
}
public static final long FIXED_SIZE =
ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
}

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment;
import org.apache.hadoop.hbase.regionserver.Segment;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
@ -476,6 +477,14 @@ public class TestHeapSize {
assertEquals(expected, actual);
}
cl = StoreHotnessProtector.class;
actual = StoreHotnessProtector.FIXED_SIZE;
expected = ClassSize.estimateBase(cl, false);
if (expected != actual) {
ClassSize.estimateBase(cl, true);
assertEquals(expected, actual);
}
// Block cache key overhead. Only tests fixed overhead as estimating heap
// size of strings is hard.
cl = BlockCacheKey.class;

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.throttle;
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER;
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT;
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category(SmallTests.class)
public class TestStoreHotnessProtector {
@ClassRule public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestStoreHotnessProtector.class);
@Test(timeout = 60000)
public void testPreparePutCounter() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Configuration conf = new Configuration();
conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0);
conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10);
conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
Region mockRegion = mock(Region.class);
StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf);
Store mockStore1 = mock(Store.class);
RegionInfo mockRegionInfo = mock(RegionInfo.class);
byte[] family = "testF1".getBytes();
when(mockRegion.getStore(family)).thenReturn(mockStore1);
when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1");
when(mockStore1.getCurrentParallelPutCount()).thenReturn(1);
when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1");
final Map<byte[], List<Cell>> familyMaps = new HashMap<>();
familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class)));
final AtomicReference<Exception> exception = new AtomicReference<>();
// PreparePutCounter not access limit
int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf
.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.execute(() -> {
try {
storeHotnessProtector.start(familyMaps);
} catch (RegionTooBusyException e) {
e.printStackTrace();
exception.set(e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(60, TimeUnit.SECONDS);
//no exception
Assert.assertEquals(exception.get(), null);
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
threadCount);
// access limit
try {
storeHotnessProtector.start(familyMaps);
} catch (RegionTooBusyException e) {
e.printStackTrace();
exception.set(e);
}
Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class);
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
// when access limit, counter will not changed.
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
threadCount + 1);
storeHotnessProtector.finish(familyMaps);
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
threadCount);
}
}