From c6d89b494c1b217e92b58e95c6da1934462edd33 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Wed, 14 Mar 2018 18:18:50 +0800 Subject: [PATCH] HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write handler exhausted Signed-off-by: Yu Li --- .../org/apache/hadoop/hbase/HConstants.java | 1 + .../hadoop/hbase/regionserver/HRegion.java | 86 +++++++- .../hadoop/hbase/regionserver/HStore.java | 39 +++- .../hbase/regionserver/RSRpcServices.java | 6 + .../hadoop/hbase/regionserver/Store.java | 2 + .../throttle/StoreHotnessProtector.java | 196 ++++++++++++++++++ .../apache/hadoop/hbase/io/TestHeapSize.java | 9 + .../throttle/TestStoreHotnessProtector.java | 130 ++++++++++++ 8 files changed, 458 insertions(+), 11 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index adc7194deb2..d826ca09111 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -89,6 +89,7 @@ public final class HConstants { NOT_RUN, SUCCESS, BAD_FAMILY, + STORE_TOO_BUSY, SANITY_CHECK_FAILURE, FAILURE } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9f3d9bd58f7..ba7ab5b82bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; @@ -673,6 +674,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final NavigableMap replicationScope = new TreeMap<>( Bytes.BYTES_COMPARATOR); + private final StoreHotnessProtector storeHotnessProtector; + /** * HRegion constructor. This constructor should only be used for testing and * extensions. Instances of HRegion should be instantiated with the @@ -793,6 +796,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? DEFAULT_DURABILITY : htd.getDurability(); + + this.storeHotnessProtector = new StoreHotnessProtector(this, conf); + if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -805,8 +811,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion = null; } if (LOG.isDebugEnabled()) { - // Write out region name as string and its encoded name. - LOG.debug("Instantiated " + this); + // Write out region name, its encoded name and storeHotnessProtector as string. + LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString()); } configurationManager = Optional.empty(); @@ -3169,9 +3175,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!isOperationPending(lastIndexExclusive)) { continue; } + + // HBASE-19389 Limit concurrency of put with dense (hundreds) columns to avoid exhausting + // RS handlers, covering both MutationBatchOperation and ReplayBatchOperation + // The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in checkAndPrepare phase and won't + // pass the isOperationPending check + Map> curFamilyCellMap = + getMutation(lastIndexExclusive).getFamilyCellMap(); + try { + // start the protector before acquiring row lock considering performance, and will finish + // it when encountering exception + region.storeHotnessProtector.start(curFamilyCellMap); + } catch (RegionTooBusyException rtbe) { + region.storeHotnessProtector.finish(curFamilyCellMap); + if (isAtomic()) { + throw rtbe; + } + retCodeDetails[lastIndexExclusive] = + new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage()); + continue; + } + Mutation mutation = getMutation(lastIndexExclusive); // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; + boolean throwException = false; try { // if atomic then get exclusive lock, else shared lock rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock); @@ -3179,16 +3207,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // NOTE: We will retry when other exceptions, but we should stop if we receive // TimeoutIOException or InterruptedIOException as operation has timed out or // interrupted respectively. + throwException = true; throw e; } catch (IOException ioe) { LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); if (isAtomic()) { // fail, atomic means all or none + throwException = true; throw ioe; } + } catch (Throwable throwable) { + throwException = true; + throw throwable; + } finally { + if (throwException) { + region.storeHotnessProtector.finish(curFamilyCellMap); + } } if (rowLock == null) { // We failed to grab another lock if (isAtomic()) { + region.storeHotnessProtector.finish(curFamilyCellMap); throw new IOException("Can't apply all operations atomically!"); } break; // Stop acquiring more rows for this batch @@ -3274,7 +3312,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public void doPostOpCleanupForMiniBatch( final MiniBatchOperationInProgress miniBatchOp, final WALEdit walEdit, - boolean success) throws IOException {} + boolean success) throws IOException { + doFinishHotnessProtector(miniBatchOp); + } + + private void doFinishHotnessProtector( + final MiniBatchOperationInProgress miniBatchOp) { + // check and return if the protector is not enabled + if (!region.storeHotnessProtector.isEnable()) { + return; + } + // miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch throwing exception. + // This case was handled. + if (miniBatchOp == null) { + return; + } + + final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive(); + + for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) { + switch (retCodeDetails[i].getOperationStatusCode()) { + case SUCCESS: + case FAILURE: + region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap()); + break; + default: + // do nothing + // We won't start the protector for NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the + // STORE_TOO_BUSY case is handled in StoreHotnessProtector#start + break; + } + } + } /** * Atomically apply the given map of family->edits to the memstore. @@ -3493,6 +3562,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress miniBatchOp, final WALEdit walEdit, boolean success) throws IOException { + + super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success); if (miniBatchOp != null) { // synced so that the coprocessor contract is adhered to. if (region.coprocessorHost != null) { @@ -4086,6 +4157,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); + } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) { + throw new RegionTooBusyException(batchMutate[0].getExceptionMsg()); } } @@ -7889,7 +7962,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 3 * Bytes.SIZEOF_BOOLEAN); @@ -7916,6 +7989,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress + ClassSize.STORE_SERVICES // store services + + StoreHotnessProtector.FIXED_SIZE ; @Override @@ -8380,7 +8454,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @Override public void onConfigurationChange(Configuration conf) { - // Do nothing for now. + this.storeHotnessProtector.update(conf); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 68a057a7013..1a382ad2471 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -178,6 +179,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat private final boolean verifyBulkLoads; + private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); + private final int parallelPutCountPrintThreshold; + private ScanInfo scanInfo; // All access must be synchronized. @@ -292,7 +296,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); } - LOG.debug("Memstore type={}", className); this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family @@ -331,6 +334,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat + flushRetriesNumber); } cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); + + int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); + if (confPrintThreshold < 10) { + confPrintThreshold = 10; + } + this.parallelPutCountPrintThreshold = confPrintThreshold; + LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold=" + + parallelPutCountPrintThreshold); } /** @@ -694,9 +705,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Cell cell, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - this.memstore.add(cell, memstoreSizing); + if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { + if (LOG.isTraceEnabled()) { + LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this + .getColumnFamilyName() + " too Busy!"); + } + } + this.memstore.add(cell, memstoreSizing); } finally { lock.readLock().unlock(); + currentParallelPutCount.decrementAndGet(); } } @@ -706,9 +724,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Iterable cells, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { + if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { + if (LOG.isTraceEnabled()) { + LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this + .getColumnFamilyName() + " too Busy!"); + } + } memstore.add(cells, memstoreSizing); } finally { lock.readLock().unlock(); + currentParallelPutCount.decrementAndGet(); } } @@ -2326,8 +2351,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) - + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); + ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK @@ -2576,4 +2601,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat lock.writeLock().unlock(); } } + + public int getCurrentParallelPutCount() { + return currentParallelPutCount.get(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index cceae285277..acf25cff647 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -1039,6 +1040,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addResultOrException(getResultOrException( ClientProtos.Result.getDefaultInstance(), index)); break; + + case STORE_TOO_BUSY: + e = new RegionTooBusyException(codes[i].getExceptionMsg()); + builder.addResultOrException(getResultOrException(e, index)); + break; } } } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 042129f5246..6eb9f18b703 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -280,4 +280,6 @@ public interface Store { * @return true if the memstore may need some extra memory space */ boolean isSloppyMemStore(); + + int getCurrentParallelPutCount(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java new file mode 100644 index 00000000000..a237a52dc6c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +/** + * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it + * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with + * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM + * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented, + * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism. + *

+ * There are three key parameters: + *

+ * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this + * threshold, the HotProtector will work, 100 by default + *

+ * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at + * the same time. + *

+ * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to + * prepare writing puts to a Store at the same time. + *

+ * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and + * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not + * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or + * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed. + *

+ * This protector is enabled by default and could be turned off by setting + * hbase.region.store.parallel.put.limit to 0, supporting online configuration change. + */ +@InterfaceAudience.Private +public class StoreHotnessProtector { + private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class); + private volatile int parallelPutToStoreThreadLimit; + + private volatile int parallelPreparePutToStoreThreadLimit; + public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = + "hbase.region.store.parallel.put.limit"; + public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = + "hbase.region.store.parallel.prepare.put.multiplier"; + private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10; + private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; + public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = + "hbase.region.store.parallel.put.limit.min.column.count"; + private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; + private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; + + private final Map preparePutToStoreMap = + new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); + private final Region region; + + public StoreHotnessProtector(Region region, Configuration conf) { + init(conf); + this.region = region; + } + + public void init(Configuration conf) { + this.parallelPutToStoreThreadLimit = + conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); + this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, + DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; + this.parallelPutToStoreThreadLimitCheckMinColumnCount = + conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, + DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); + + } + + public void update(Configuration conf) { + init(conf); + preparePutToStoreMap.clear(); + LOG.debug("update config: " + toString()); + } + + public void start(Map> familyMaps) throws RegionTooBusyException { + if (!isEnable()) { + return; + } + + String tooBusyStore = null; + + for (Map.Entry> e : familyMaps.entrySet()) { + Store store = this.region.getStore(e.getKey()); + if (store == null || e.getValue() == null) { + continue; + } + + if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { + + //we need to try to add #preparePutCount at first because preparePutToStoreMap will be + //cleared when changing the configuration. + preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger()); + AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey()); + if (preparePutCounter == null) { + preparePutCounter = new AtomicInteger(); + preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter); + } + int preparePutCount = preparePutCounter.incrementAndGet(); + if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit + || preparePutCount > this.parallelPreparePutToStoreThreadLimit) { + tooBusyStore = (tooBusyStore == null ? + store.getColumnFamilyName() : + tooBusyStore + "," + store.getColumnFamilyName()); + } + + if (LOG.isTraceEnabled()) { + LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount + + "; currentParallelPutCount=" + store.getCurrentParallelPutCount()); + } + } + } + + if (tooBusyStore != null) { + String msg = + "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore + + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")"; + if (LOG.isTraceEnabled()) { + LOG.trace(msg); + } + throw new RegionTooBusyException(msg); + } + } + + public void finish(Map> familyMaps) { + if (!isEnable()) { + return; + } + + for (Map.Entry> e : familyMaps.entrySet()) { + Store store = this.region.getStore(e.getKey()); + if (store == null || e.getValue() == null) { + continue; + } + if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { + AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); + // preparePutToStoreMap will be cleared when changing the configuration, so it may turn + // into a negative value. It will be not accuracy in a short time, it's a trade-off for + // performance. + if (counter != null && counter.decrementAndGet() < 0) { + counter.incrementAndGet(); + } + } + } + } + + public String toString() { + return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" + + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" + + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" + + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ? + "enable" : + "disable"); + } + + public boolean isEnable() { + // feature is enabled when parallelPutToStoreThreadLimit > 0 + return this.parallelPutToStoreThreadLimit > 0; + } + + @VisibleForTesting + Map getPreparePutToStoreMap() { + return preparePutToStoreMap; + } + + public static final long FIXED_SIZE = + ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index f979397f453..2d454e58c1c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment; import org.apache.hadoop.hbase.regionserver.Segment; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ClassSize; @@ -476,6 +477,14 @@ public class TestHeapSize { assertEquals(expected, actual); } + cl = StoreHotnessProtector.class; + actual = StoreHotnessProtector.FIXED_SIZE; + expected = ClassSize.estimateBase(cl, false); + if (expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + // Block cache key overhead. Only tests fixed overhead as estimating heap // size of strings is hard. cl = BlockCacheKey.class; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java new file mode 100644 index 00000000000..6d4193474b2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; +import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category(SmallTests.class) +public class TestStoreHotnessProtector { + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestStoreHotnessProtector.class); + + @Test(timeout = 60000) + public void testPreparePutCounter() throws Exception { + + ExecutorService executorService = Executors.newFixedThreadPool(10); + + Configuration conf = new Configuration(); + conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0); + conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); + conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); + Region mockRegion = mock(Region.class); + StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf); + + Store mockStore1 = mock(Store.class); + RegionInfo mockRegionInfo = mock(RegionInfo.class); + byte[] family = "testF1".getBytes(); + + when(mockRegion.getStore(family)).thenReturn(mockStore1); + when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); + when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); + + when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); + when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); + + final Map> familyMaps = new HashMap<>(); + familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); + + final AtomicReference exception = new AtomicReference<>(); + + // PreparePutCounter not access limit + + int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf + .getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); + CountDownLatch countDownLatch = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + executorService.execute(() -> { + try { + storeHotnessProtector.start(familyMaps); + } catch (RegionTooBusyException e) { + e.printStackTrace(); + exception.set(e); + } finally { + countDownLatch.countDown(); + } + }); + } + + countDownLatch.await(60, TimeUnit.SECONDS); + //no exception + Assert.assertEquals(exception.get(), null); + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), + threadCount); + + // access limit + + try { + storeHotnessProtector.start(familyMaps); + } catch (RegionTooBusyException e) { + e.printStackTrace(); + exception.set(e); + } + + Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class); + + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); + // when access limit, counter will not changed. + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), + threadCount + 1); + + storeHotnessProtector.finish(familyMaps); + Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), + threadCount); + } + +} \ No newline at end of file