diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index d826ca09111..adc7194deb2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -89,7 +89,6 @@ public final class HConstants { NOT_RUN, SUCCESS, BAD_FAMILY, - STORE_TOO_BUSY, SANITY_CHECK_FAILURE, FAILURE } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ba7ab5b82bf..9f3d9bd58f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -145,7 +145,6 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; -import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; @@ -674,8 +673,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final NavigableMap replicationScope = new TreeMap<>( Bytes.BYTES_COMPARATOR); - private final StoreHotnessProtector storeHotnessProtector; - /** * HRegion constructor. This constructor should only be used for testing and * extensions. Instances of HRegion should be instantiated with the @@ -796,9 +793,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ? DEFAULT_DURABILITY : htd.getDurability(); - - this.storeHotnessProtector = new StoreHotnessProtector(this, conf); - if (rsServices != null) { this.rsAccounting = this.rsServices.getRegionServerAccounting(); // don't initialize coprocessors if not running within a regionserver @@ -811,8 +805,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion = null; } if (LOG.isDebugEnabled()) { - // Write out region name, its encoded name and storeHotnessProtector as string. - LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString()); + // Write out region name as string and its encoded name. + LOG.debug("Instantiated " + this); } configurationManager = Optional.empty(); @@ -3175,31 +3169,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!isOperationPending(lastIndexExclusive)) { continue; } - - // HBASE-19389 Limit concurrency of put with dense (hundreds) columns to avoid exhausting - // RS handlers, covering both MutationBatchOperation and ReplayBatchOperation - // The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in checkAndPrepare phase and won't - // pass the isOperationPending check - Map> curFamilyCellMap = - getMutation(lastIndexExclusive).getFamilyCellMap(); - try { - // start the protector before acquiring row lock considering performance, and will finish - // it when encountering exception - region.storeHotnessProtector.start(curFamilyCellMap); - } catch (RegionTooBusyException rtbe) { - region.storeHotnessProtector.finish(curFamilyCellMap); - if (isAtomic()) { - throw rtbe; - } - retCodeDetails[lastIndexExclusive] = - new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage()); - continue; - } - Mutation mutation = getMutation(lastIndexExclusive); // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; - boolean throwException = false; try { // if atomic then get exclusive lock, else shared lock rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock); @@ -3207,26 +3179,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // NOTE: We will retry when other exceptions, but we should stop if we receive // TimeoutIOException or InterruptedIOException as operation has timed out or // interrupted respectively. - throwException = true; throw e; } catch (IOException ioe) { LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); if (isAtomic()) { // fail, atomic means all or none - throwException = true; throw ioe; } - } catch (Throwable throwable) { - throwException = true; - throw throwable; - } finally { - if (throwException) { - region.storeHotnessProtector.finish(curFamilyCellMap); - } } if (rowLock == null) { // We failed to grab another lock if (isAtomic()) { - region.storeHotnessProtector.finish(curFamilyCellMap); throw new IOException("Can't apply all operations atomically!"); } break; // Stop acquiring more rows for this batch @@ -3312,38 +3274,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public void doPostOpCleanupForMiniBatch( final MiniBatchOperationInProgress miniBatchOp, final WALEdit walEdit, - boolean success) throws IOException { - doFinishHotnessProtector(miniBatchOp); - } - - private void doFinishHotnessProtector( - final MiniBatchOperationInProgress miniBatchOp) { - // check and return if the protector is not enabled - if (!region.storeHotnessProtector.isEnable()) { - return; - } - // miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch throwing exception. - // This case was handled. - if (miniBatchOp == null) { - return; - } - - final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive(); - - for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) { - switch (retCodeDetails[i].getOperationStatusCode()) { - case SUCCESS: - case FAILURE: - region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap()); - break; - default: - // do nothing - // We won't start the protector for NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the - // STORE_TOO_BUSY case is handled in StoreHotnessProtector#start - break; - } - } - } + boolean success) throws IOException {} /** * Atomically apply the given map of family->edits to the memstore. @@ -3562,8 +3493,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress miniBatchOp, final WALEdit walEdit, boolean success) throws IOException { - - super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success); if (miniBatchOp != null) { // synced so that the coprocessor contract is adhered to. if (region.coprocessorHost != null) { @@ -4157,8 +4086,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); - } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) { - throw new RegionTooBusyException(batchMutate[0].getExceptionMsg()); } } @@ -7962,7 +7889,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 3 * Bytes.SIZEOF_BOOLEAN); @@ -7989,7 +7916,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress + ClassSize.STORE_SERVICES // store services - + StoreHotnessProtector.FIXED_SIZE ; @Override @@ -8454,7 +8380,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @Override public void onConfigurationChange(Configuration conf) { - this.storeHotnessProtector.update(conf); + // Do nothing for now. } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 1a382ad2471..68a057a7013 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1,4 +1,4 @@ -/* +/** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -42,7 +42,6 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -179,9 +178,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat private final boolean verifyBulkLoads; - private final AtomicInteger currentParallelPutCount = new AtomicInteger(0); - private final int parallelPutCountPrintThreshold; - private ScanInfo scanInfo; // All access must be synchronized. @@ -296,6 +292,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction }); } + LOG.debug("Memstore type={}", className); this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family @@ -334,14 +331,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat + flushRetriesNumber); } cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); - - int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50); - if (confPrintThreshold < 10) { - confPrintThreshold = 10; - } - this.parallelPutCountPrintThreshold = confPrintThreshold; - LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold=" - + parallelPutCountPrintThreshold); } /** @@ -705,16 +694,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Cell cell, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { - if (LOG.isTraceEnabled()) { - LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this - .getColumnFamilyName() + " too Busy!"); - } - } - this.memstore.add(cell, memstoreSizing); + this.memstore.add(cell, memstoreSizing); } finally { lock.readLock().unlock(); - currentParallelPutCount.decrementAndGet(); } } @@ -724,16 +706,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat public void add(final Iterable cells, MemStoreSizing memstoreSizing) { lock.readLock().lock(); try { - if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { - if (LOG.isTraceEnabled()) { - LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this - .getColumnFamilyName() + " too Busy!"); - } - } memstore.add(cells, memstoreSizing); } finally { lock.readLock().unlock(); - currentParallelPutCount.decrementAndGet(); } } @@ -2351,8 +2326,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) - + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); + ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK @@ -2601,8 +2576,4 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat lock.writeLock().unlock(); } } - - public int getCurrentParallelPutCount() { - return currentParallelPutCount.get(); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index acf25cff647..cceae285277 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -1040,11 +1039,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addResultOrException(getResultOrException( ClientProtos.Result.getDefaultInstance(), index)); break; - - case STORE_TOO_BUSY: - e = new RegionTooBusyException(codes[i].getExceptionMsg()); - builder.addResultOrException(getResultOrException(e, index)); - break; } } } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 6eb9f18b703..042129f5246 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -280,6 +280,4 @@ public interface Store { * @return true if the memstore may need some extra memory space */ boolean isSloppyMemStore(); - - int getCurrentParallelPutCount(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java deleted file mode 100644 index a237a52dc6c..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.throttle; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.RegionTooBusyException; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -/** - * StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it - * does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with - * dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM - * degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented, - * while StoreHotnessProtector is system-oriented, RS-self-protected mechanism. - *

- * There are three key parameters: - *

- * 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this - * threshold, the HotProtector will work, 100 by default - *

- * 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at - * the same time. - *

- * 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to - * prepare writing puts to a Store at the same time. - *

- * Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and - * WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not - * enough since the actual concurrency of puts may still exceed the limit when MVCC contention or - * slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed. - *

- * This protector is enabled by default and could be turned off by setting - * hbase.region.store.parallel.put.limit to 0, supporting online configuration change. - */ -@InterfaceAudience.Private -public class StoreHotnessProtector { - private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class); - private volatile int parallelPutToStoreThreadLimit; - - private volatile int parallelPreparePutToStoreThreadLimit; - public final static String PARALLEL_PUT_STORE_THREADS_LIMIT = - "hbase.region.store.parallel.put.limit"; - public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = - "hbase.region.store.parallel.prepare.put.multiplier"; - private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10; - private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount; - public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT = - "hbase.region.store.parallel.put.limit.min.column.count"; - private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100; - private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2; - - private final Map preparePutToStoreMap = - new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); - private final Region region; - - public StoreHotnessProtector(Region region, Configuration conf) { - init(conf); - this.region = region; - } - - public void init(Configuration conf) { - this.parallelPutToStoreThreadLimit = - conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT); - this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, - DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit; - this.parallelPutToStoreThreadLimitCheckMinColumnCount = - conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, - DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM); - - } - - public void update(Configuration conf) { - init(conf); - preparePutToStoreMap.clear(); - LOG.debug("update config: " + toString()); - } - - public void start(Map> familyMaps) throws RegionTooBusyException { - if (!isEnable()) { - return; - } - - String tooBusyStore = null; - - for (Map.Entry> e : familyMaps.entrySet()) { - Store store = this.region.getStore(e.getKey()); - if (store == null || e.getValue() == null) { - continue; - } - - if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { - - //we need to try to add #preparePutCount at first because preparePutToStoreMap will be - //cleared when changing the configuration. - preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger()); - AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey()); - if (preparePutCounter == null) { - preparePutCounter = new AtomicInteger(); - preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter); - } - int preparePutCount = preparePutCounter.incrementAndGet(); - if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit - || preparePutCount > this.parallelPreparePutToStoreThreadLimit) { - tooBusyStore = (tooBusyStore == null ? - store.getColumnFamilyName() : - tooBusyStore + "," + store.getColumnFamilyName()); - } - - if (LOG.isTraceEnabled()) { - LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount - + "; currentParallelPutCount=" + store.getCurrentParallelPutCount()); - } - } - } - - if (tooBusyStore != null) { - String msg = - "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore - + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")"; - if (LOG.isTraceEnabled()) { - LOG.trace(msg); - } - throw new RegionTooBusyException(msg); - } - } - - public void finish(Map> familyMaps) { - if (!isEnable()) { - return; - } - - for (Map.Entry> e : familyMaps.entrySet()) { - Store store = this.region.getStore(e.getKey()); - if (store == null || e.getValue() == null) { - continue; - } - if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) { - AtomicInteger counter = preparePutToStoreMap.get(e.getKey()); - // preparePutToStoreMap will be cleared when changing the configuration, so it may turn - // into a negative value. It will be not accuracy in a short time, it's a trade-off for - // performance. - if (counter != null && counter.decrementAndGet() < 0) { - counter.incrementAndGet(); - } - } - } - } - - public String toString() { - return "StoreHotnessProtector, parallelPutToStoreThreadLimit=" - + this.parallelPutToStoreThreadLimit + " ; minColumnNum=" - + this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit=" - + this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ? - "enable" : - "disable"); - } - - public boolean isEnable() { - // feature is enabled when parallelPutToStoreThreadLimit > 0 - return this.parallelPutToStoreThreadLimit > 0; - } - - @VisibleForTesting - Map getPreparePutToStoreMap() { - return preparePutToStoreMap; - } - - public static final long FIXED_SIZE = - ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT); -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 2d454e58c1c..f979397f453 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment; import org.apache.hadoop.hbase.regionserver.Segment; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker; -import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ClassSize; @@ -477,14 +476,6 @@ public class TestHeapSize { assertEquals(expected, actual); } - cl = StoreHotnessProtector.class; - actual = StoreHotnessProtector.FIXED_SIZE; - expected = ClassSize.estimateBase(cl, false); - if (expected != actual) { - ClassSize.estimateBase(cl, true); - assertEquals(expected, actual); - } - // Block cache key overhead. Only tests fixed overhead as estimating heap // size of strings is hard. cl = BlockCacheKey.class; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java deleted file mode 100644 index 6d4193474b2..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestStoreHotnessProtector.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.throttle; - -import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; -import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; -import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.RegionTooBusyException; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - -@Category(SmallTests.class) -public class TestStoreHotnessProtector { - - @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestStoreHotnessProtector.class); - - @Test(timeout = 60000) - public void testPreparePutCounter() throws Exception { - - ExecutorService executorService = Executors.newFixedThreadPool(10); - - Configuration conf = new Configuration(); - conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0); - conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); - conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); - Region mockRegion = mock(Region.class); - StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf); - - Store mockStore1 = mock(Store.class); - RegionInfo mockRegionInfo = mock(RegionInfo.class); - byte[] family = "testF1".getBytes(); - - when(mockRegion.getStore(family)).thenReturn(mockStore1); - when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); - when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); - - when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); - when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); - - final Map> familyMaps = new HashMap<>(); - familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); - - final AtomicReference exception = new AtomicReference<>(); - - // PreparePutCounter not access limit - - int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf - .getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); - CountDownLatch countDownLatch = new CountDownLatch(threadCount); - - for (int i = 0; i < threadCount; i++) { - executorService.execute(() -> { - try { - storeHotnessProtector.start(familyMaps); - } catch (RegionTooBusyException e) { - e.printStackTrace(); - exception.set(e); - } finally { - countDownLatch.countDown(); - } - }); - } - - countDownLatch.await(60, TimeUnit.SECONDS); - //no exception - Assert.assertEquals(exception.get(), null); - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), - threadCount); - - // access limit - - try { - storeHotnessProtector.start(familyMaps); - } catch (RegionTooBusyException e) { - e.printStackTrace(); - exception.set(e); - } - - Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class); - - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); - // when access limit, counter will not changed. - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), - threadCount + 1); - - storeHotnessProtector.finish(familyMaps); - Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), - threadCount); - } - -} \ No newline at end of file