Revert "HBASE-19389 Limit concurrency of put with dense (hundreds) columns to prevent write handler exhausted"
This reverts commit 98ac4f12b5
.
This commit is contained in:
parent
98ac4f12b5
commit
9a26af37b9
|
@ -89,7 +89,6 @@ public final class HConstants {
|
||||||
NOT_RUN,
|
NOT_RUN,
|
||||||
SUCCESS,
|
SUCCESS,
|
||||||
BAD_FAMILY,
|
BAD_FAMILY,
|
||||||
STORE_TOO_BUSY,
|
|
||||||
SANITY_CHECK_FAILURE,
|
SANITY_CHECK_FAILURE,
|
||||||
FAILURE
|
FAILURE
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -146,7 +146,6 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
@ -675,8 +674,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
|
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
|
||||||
Bytes.BYTES_COMPARATOR);
|
Bytes.BYTES_COMPARATOR);
|
||||||
|
|
||||||
private final StoreHotnessProtector storeHotnessProtector;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HRegion constructor. This constructor should only be used for testing and
|
* HRegion constructor. This constructor should only be used for testing and
|
||||||
* extensions. Instances of HRegion should be instantiated with the
|
* extensions. Instances of HRegion should be instantiated with the
|
||||||
|
@ -797,9 +794,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||||
this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
|
this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
|
||||||
DEFAULT_DURABILITY : htd.getDurability();
|
DEFAULT_DURABILITY : htd.getDurability();
|
||||||
|
|
||||||
this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
|
|
||||||
|
|
||||||
if (rsServices != null) {
|
if (rsServices != null) {
|
||||||
this.rsAccounting = this.rsServices.getRegionServerAccounting();
|
this.rsAccounting = this.rsServices.getRegionServerAccounting();
|
||||||
// don't initialize coprocessors if not running within a regionserver
|
// don't initialize coprocessors if not running within a regionserver
|
||||||
|
@ -812,8 +806,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
this.metricsRegion = null;
|
this.metricsRegion = null;
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
// Write out region name, its encoded name and storeHotnessProtector as string.
|
// Write out region name as string and its encoded name.
|
||||||
LOG.debug("Instantiated " + this +"; "+ storeHotnessProtector.toString());
|
LOG.debug("Instantiated " + this);
|
||||||
}
|
}
|
||||||
|
|
||||||
configurationManager = Optional.empty();
|
configurationManager = Optional.empty();
|
||||||
|
@ -3186,31 +3180,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
if (!isOperationPending(lastIndexExclusive)) {
|
if (!isOperationPending(lastIndexExclusive)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// HBASE-19389 Limit concurrency of put with dense (hundreds) columns to avoid exhausting
|
|
||||||
// RS handlers, covering both MutationBatchOperation and ReplayBatchOperation
|
|
||||||
// The BAD_FAMILY/SANITY_CHECK_FAILURE cases are handled in checkAndPrepare phase and won't
|
|
||||||
// pass the isOperationPending check
|
|
||||||
Map<byte[], List<Cell>> curFamilyCellMap =
|
|
||||||
getMutation(lastIndexExclusive).getFamilyCellMap();
|
|
||||||
try {
|
|
||||||
// start the protector before acquiring row lock considering performance, and will finish
|
|
||||||
// it when encountering exception
|
|
||||||
region.storeHotnessProtector.start(curFamilyCellMap);
|
|
||||||
} catch (RegionTooBusyException rtbe) {
|
|
||||||
region.storeHotnessProtector.finish(curFamilyCellMap);
|
|
||||||
if (isAtomic()) {
|
|
||||||
throw rtbe;
|
|
||||||
}
|
|
||||||
retCodeDetails[lastIndexExclusive] =
|
|
||||||
new OperationStatus(OperationStatusCode.STORE_TOO_BUSY, rtbe.getMessage());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Mutation mutation = getMutation(lastIndexExclusive);
|
Mutation mutation = getMutation(lastIndexExclusive);
|
||||||
// If we haven't got any rows in our batch, we should block to get the next one.
|
// If we haven't got any rows in our batch, we should block to get the next one.
|
||||||
RowLock rowLock = null;
|
RowLock rowLock = null;
|
||||||
boolean throwException = false;
|
|
||||||
try {
|
try {
|
||||||
// if atomic then get exclusive lock, else shared lock
|
// if atomic then get exclusive lock, else shared lock
|
||||||
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
|
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
|
||||||
|
@ -3218,26 +3190,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
// NOTE: We will retry when other exceptions, but we should stop if we receive
|
// NOTE: We will retry when other exceptions, but we should stop if we receive
|
||||||
// TimeoutIOException or InterruptedIOException as operation has timed out or
|
// TimeoutIOException or InterruptedIOException as operation has timed out or
|
||||||
// interrupted respectively.
|
// interrupted respectively.
|
||||||
throwException = true;
|
|
||||||
throw e;
|
throw e;
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
|
LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
|
||||||
if (isAtomic()) { // fail, atomic means all or none
|
if (isAtomic()) { // fail, atomic means all or none
|
||||||
throwException = true;
|
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
} catch (Throwable throwable) {
|
|
||||||
throwException = true;
|
|
||||||
throw throwable;
|
|
||||||
} finally {
|
|
||||||
if (throwException) {
|
|
||||||
region.storeHotnessProtector.finish(curFamilyCellMap);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (rowLock == null) {
|
if (rowLock == null) {
|
||||||
// We failed to grab another lock
|
// We failed to grab another lock
|
||||||
if (isAtomic()) {
|
if (isAtomic()) {
|
||||||
region.storeHotnessProtector.finish(curFamilyCellMap);
|
|
||||||
throw new IOException("Can't apply all operations atomically!");
|
throw new IOException("Can't apply all operations atomically!");
|
||||||
}
|
}
|
||||||
break; // Stop acquiring more rows for this batch
|
break; // Stop acquiring more rows for this batch
|
||||||
|
@ -3323,38 +3285,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
public void doPostOpCleanupForMiniBatch(
|
public void doPostOpCleanupForMiniBatch(
|
||||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,
|
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,
|
||||||
boolean success) throws IOException {
|
boolean success) throws IOException {}
|
||||||
doFinishHotnessProtector(miniBatchOp);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doFinishHotnessProtector(
|
|
||||||
final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
|
|
||||||
// check and return if the protector is not enabled
|
|
||||||
if (!region.storeHotnessProtector.isEnable()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// miniBatchOp is null, if and only if lockRowsAndBuildMiniBatch throwing exception.
|
|
||||||
// This case was handled.
|
|
||||||
if (miniBatchOp == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final int finalLastIndexExclusive = miniBatchOp.getLastIndexExclusive();
|
|
||||||
|
|
||||||
for (int i = nextIndexToProcess; i < finalLastIndexExclusive; i++) {
|
|
||||||
switch (retCodeDetails[i].getOperationStatusCode()) {
|
|
||||||
case SUCCESS:
|
|
||||||
case FAILURE:
|
|
||||||
region.storeHotnessProtector.finish(getMutation(i).getFamilyCellMap());
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
// do nothing
|
|
||||||
// We won't start the protector for NOT_RUN/BAD_FAMILY/SANITY_CHECK_FAILURE and the
|
|
||||||
// STORE_TOO_BUSY case is handled in StoreHotnessProtector#start
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Atomically apply the given map of family->edits to the memstore.
|
* Atomically apply the given map of family->edits to the memstore.
|
||||||
|
@ -3573,8 +3504,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
@Override
|
@Override
|
||||||
public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
|
public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
|
||||||
final WALEdit walEdit, boolean success) throws IOException {
|
final WALEdit walEdit, boolean success) throws IOException {
|
||||||
|
|
||||||
super.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, success);
|
|
||||||
if (miniBatchOp != null) {
|
if (miniBatchOp != null) {
|
||||||
// synced so that the coprocessor contract is adhered to.
|
// synced so that the coprocessor contract is adhered to.
|
||||||
if (region.coprocessorHost != null) {
|
if (region.coprocessorHost != null) {
|
||||||
|
@ -4168,8 +4097,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
|
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
|
||||||
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
|
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
|
||||||
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
|
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
|
||||||
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) {
|
|
||||||
throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7973,7 +7900,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||||
ClassSize.OBJECT +
|
ClassSize.OBJECT +
|
||||||
ClassSize.ARRAY +
|
ClassSize.ARRAY +
|
||||||
51 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
|
50 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
|
||||||
(14 * Bytes.SIZEOF_LONG) +
|
(14 * Bytes.SIZEOF_LONG) +
|
||||||
3 * Bytes.SIZEOF_BOOLEAN);
|
3 * Bytes.SIZEOF_BOOLEAN);
|
||||||
|
|
||||||
|
@ -8000,7 +7927,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
+ 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
|
+ 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
|
||||||
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
|
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
|
||||||
+ ClassSize.STORE_SERVICES // store services
|
+ ClassSize.STORE_SERVICES // store services
|
||||||
+ StoreHotnessProtector.FIXED_SIZE
|
|
||||||
;
|
;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -8465,7 +8391,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration conf) {
|
public void onConfigurationChange(Configuration conf) {
|
||||||
this.storeHotnessProtector.update(conf);
|
// Do nothing for now.
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -43,7 +43,6 @@ import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -182,9 +181,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
|
|
||||||
private final boolean verifyBulkLoads;
|
private final boolean verifyBulkLoads;
|
||||||
|
|
||||||
private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);
|
|
||||||
private final int parallelPutCountPrintThreshold;
|
|
||||||
|
|
||||||
private ScanInfo scanInfo;
|
private ScanInfo scanInfo;
|
||||||
|
|
||||||
// All access must be synchronized.
|
// All access must be synchronized.
|
||||||
|
@ -299,6 +295,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
|
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
|
||||||
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
|
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
|
||||||
}
|
}
|
||||||
|
LOG.debug("Memstore type={}", className);
|
||||||
this.offPeakHours = OffPeakHours.getInstance(conf);
|
this.offPeakHours = OffPeakHours.getInstance(conf);
|
||||||
|
|
||||||
// Setting up cache configuration for this family
|
// Setting up cache configuration for this family
|
||||||
|
@ -337,14 +334,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
+ flushRetriesNumber);
|
+ flushRetriesNumber);
|
||||||
}
|
}
|
||||||
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
|
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
|
||||||
|
|
||||||
int confPrintThreshold = conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);
|
|
||||||
if (confPrintThreshold < 10) {
|
|
||||||
confPrintThreshold = 10;
|
|
||||||
}
|
|
||||||
this.parallelPutCountPrintThreshold = confPrintThreshold;
|
|
||||||
LOG.info("Memstore class name is " + className + " ; parallelPutCountPrintThreshold="
|
|
||||||
+ parallelPutCountPrintThreshold);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -708,16 +697,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
public void add(final Cell cell, MemStoreSizing memstoreSizing) {
|
public void add(final Cell cell, MemStoreSizing memstoreSizing) {
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
|
this.memstore.add(cell, memstoreSizing);
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
|
|
||||||
.getColumnFamilyName() + " too Busy!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.memstore.add(cell, memstoreSizing);
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.readLock().unlock();
|
lock.readLock().unlock();
|
||||||
currentParallelPutCount.decrementAndGet();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -727,16 +709,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
|
public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace(this.getTableName() + ":" + this.getRegionInfo().getEncodedName() + ":" + this
|
|
||||||
.getColumnFamilyName() + " too Busy!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
memstore.add(cells, memstoreSizing);
|
memstore.add(cells, memstoreSizing);
|
||||||
} finally {
|
} finally {
|
||||||
lock.readLock().unlock();
|
lock.readLock().unlock();
|
||||||
currentParallelPutCount.decrementAndGet();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2393,8 +2368,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final long FIXED_OVERHEAD =
|
public static final long FIXED_OVERHEAD =
|
||||||
ClassSize.align(ClassSize.OBJECT + (27 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
|
ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
|
||||||
+ (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||||
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
|
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
|
||||||
|
@ -2710,8 +2685,4 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCurrentParallelPutCount() {
|
|
||||||
return currentParallelPutCount.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -1040,11 +1039,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
builder.addResultOrException(getResultOrException(
|
builder.addResultOrException(getResultOrException(
|
||||||
ClientProtos.Result.getDefaultInstance(), index));
|
ClientProtos.Result.getDefaultInstance(), index));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case STORE_TOO_BUSY:
|
|
||||||
e = new RegionTooBusyException(codes[i].getExceptionMsg());
|
|
||||||
builder.addResultOrException(getResultOrException(e, index));
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -280,6 +280,4 @@ public interface Store {
|
||||||
* @return true if the memstore may need some extra memory space
|
* @return true if the memstore may need some extra memory space
|
||||||
*/
|
*/
|
||||||
boolean isSloppyMemStore();
|
boolean isSloppyMemStore();
|
||||||
|
|
||||||
int getCurrentParallelPutCount();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,196 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver.throttle;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
|
||||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
/**
|
|
||||||
* StoreHotnessProtector is designed to help limit the concurrency of puts with dense columns, it
|
|
||||||
* does best-effort to avoid exhausting all RS's handlers. When a lot of clients write requests with
|
|
||||||
* dense (hundreds) columns to a Store at the same time, it will lead to blocking of RS because CSLM
|
|
||||||
* degrades when concurrency goes up. It's not a kind of throttling. Throttling is user-oriented,
|
|
||||||
* while StoreHotnessProtector is system-oriented, RS-self-protected mechanism.
|
|
||||||
* <p>
|
|
||||||
* There are three key parameters:
|
|
||||||
* <p>
|
|
||||||
* 1. parallelPutToStoreThreadLimitCheckMinColumnCount: If the amount of columns exceed this
|
|
||||||
* threshold, the HotProtector will work, 100 by default
|
|
||||||
* <p>
|
|
||||||
* 2. parallelPutToStoreThreadLimit: The amount of concurrency allowed to write puts to a Store at
|
|
||||||
* the same time.
|
|
||||||
* <p>
|
|
||||||
* 3. parallelPreparePutToStoreThreadLimit: The amount of concurrency allowed to
|
|
||||||
* prepare writing puts to a Store at the same time.
|
|
||||||
* <p>
|
|
||||||
* Notice that our writing pipeline includes three key process: MVCC acquire, writing MemStore, and
|
|
||||||
* WAL. Only limit the concurrency of writing puts to Store(parallelPutToStoreThreadLimit) is not
|
|
||||||
* enough since the actual concurrency of puts may still exceed the limit when MVCC contention or
|
|
||||||
* slow WAL sync happens. This is why parallelPreparePutToStoreThreadLimit is needed.
|
|
||||||
* <p>
|
|
||||||
* This protector is enabled by default and could be turned off by setting
|
|
||||||
* hbase.region.store.parallel.put.limit to 0, supporting online configuration change.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class StoreHotnessProtector {
|
|
||||||
private static final Log LOG = LogFactory.getLog(StoreHotnessProtector.class);
|
|
||||||
private volatile int parallelPutToStoreThreadLimit;
|
|
||||||
|
|
||||||
private volatile int parallelPreparePutToStoreThreadLimit;
|
|
||||||
public final static String PARALLEL_PUT_STORE_THREADS_LIMIT =
|
|
||||||
"hbase.region.store.parallel.put.limit";
|
|
||||||
public final static String PARALLEL_PREPARE_PUT_STORE_MULTIPLIER =
|
|
||||||
"hbase.region.store.parallel.prepare.put.multiplier";
|
|
||||||
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT = 10;
|
|
||||||
private volatile int parallelPutToStoreThreadLimitCheckMinColumnCount;
|
|
||||||
public final static String PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT =
|
|
||||||
"hbase.region.store.parallel.put.limit.min.column.count";
|
|
||||||
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
|
|
||||||
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
|
|
||||||
|
|
||||||
private final Map<byte[], AtomicInteger> preparePutToStoreMap =
|
|
||||||
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
|
|
||||||
private final Region region;
|
|
||||||
|
|
||||||
public StoreHotnessProtector(Region region, Configuration conf) {
|
|
||||||
init(conf);
|
|
||||||
this.region = region;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void init(Configuration conf) {
|
|
||||||
this.parallelPutToStoreThreadLimit =
|
|
||||||
conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT);
|
|
||||||
this.parallelPreparePutToStoreThreadLimit = conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER,
|
|
||||||
DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER) * parallelPutToStoreThreadLimit;
|
|
||||||
this.parallelPutToStoreThreadLimitCheckMinColumnCount =
|
|
||||||
conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT,
|
|
||||||
DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public void update(Configuration conf) {
|
|
||||||
init(conf);
|
|
||||||
preparePutToStoreMap.clear();
|
|
||||||
LOG.debug("update config: " + toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
|
|
||||||
if (!isEnable()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
String tooBusyStore = null;
|
|
||||||
|
|
||||||
for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
|
|
||||||
Store store = this.region.getStore(e.getKey());
|
|
||||||
if (store == null || e.getValue() == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
|
|
||||||
|
|
||||||
//we need to try to add #preparePutCount at first because preparePutToStoreMap will be
|
|
||||||
//cleared when changing the configuration.
|
|
||||||
preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
|
|
||||||
AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
|
|
||||||
if (preparePutCounter == null) {
|
|
||||||
preparePutCounter = new AtomicInteger();
|
|
||||||
preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
|
|
||||||
}
|
|
||||||
int preparePutCount = preparePutCounter.incrementAndGet();
|
|
||||||
if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
|
|
||||||
|| preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
|
|
||||||
tooBusyStore = (tooBusyStore == null ?
|
|
||||||
store.getColumnFamilyName() :
|
|
||||||
tooBusyStore + "," + store.getColumnFamilyName());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace(store.getColumnFamilyName() + ": preparePutCount=" + preparePutCount
|
|
||||||
+ "; currentParallelPutCount=" + store.getCurrentParallelPutCount());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tooBusyStore != null) {
|
|
||||||
String msg =
|
|
||||||
"StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
|
|
||||||
+ " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace(msg);
|
|
||||||
}
|
|
||||||
throw new RegionTooBusyException(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void finish(Map<byte[], List<Cell>> familyMaps) {
|
|
||||||
if (!isEnable()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Map.Entry<byte[], List<Cell>> e : familyMaps.entrySet()) {
|
|
||||||
Store store = this.region.getStore(e.getKey());
|
|
||||||
if (store == null || e.getValue() == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (e.getValue().size() > this.parallelPutToStoreThreadLimitCheckMinColumnCount) {
|
|
||||||
AtomicInteger counter = preparePutToStoreMap.get(e.getKey());
|
|
||||||
// preparePutToStoreMap will be cleared when changing the configuration, so it may turn
|
|
||||||
// into a negative value. It will be not accuracy in a short time, it's a trade-off for
|
|
||||||
// performance.
|
|
||||||
if (counter != null && counter.decrementAndGet() < 0) {
|
|
||||||
counter.incrementAndGet();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return "StoreHotnessProtector, parallelPutToStoreThreadLimit="
|
|
||||||
+ this.parallelPutToStoreThreadLimit + " ; minColumnNum="
|
|
||||||
+ this.parallelPutToStoreThreadLimitCheckMinColumnCount + " ; preparePutThreadLimit="
|
|
||||||
+ this.parallelPreparePutToStoreThreadLimit + " ; hotProtect now " + (this.isEnable() ?
|
|
||||||
"enable" :
|
|
||||||
"disable");
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isEnable() {
|
|
||||||
// feature is enabled when parallelPutToStoreThreadLimit > 0
|
|
||||||
return this.parallelPutToStoreThreadLimit > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
Map<byte[], AtomicInteger> getPreparePutToStoreMap() {
|
|
||||||
return preparePutToStoreMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final long FIXED_SIZE =
|
|
||||||
ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT);
|
|
||||||
}
|
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.regionserver.MutableSegment;
|
||||||
import org.apache.hadoop.hbase.regionserver.Segment;
|
import org.apache.hadoop.hbase.regionserver.Segment;
|
||||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
|
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.NonSyncTimeRangeTracker;
|
||||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
|
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker.SyncTimeRangeTracker;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
@ -477,14 +476,6 @@ public class TestHeapSize {
|
||||||
assertEquals(expected, actual);
|
assertEquals(expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
cl = StoreHotnessProtector.class;
|
|
||||||
actual = StoreHotnessProtector.FIXED_SIZE;
|
|
||||||
expected = ClassSize.estimateBase(cl, false);
|
|
||||||
if (expected != actual) {
|
|
||||||
ClassSize.estimateBase(cl, true);
|
|
||||||
assertEquals(expected, actual);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Block cache key overhead. Only tests fixed overhead as estimating heap
|
// Block cache key overhead. Only tests fixed overhead as estimating heap
|
||||||
// size of strings is hard.
|
// size of strings is hard.
|
||||||
cl = BlockCacheKey.class;
|
cl = BlockCacheKey.class;
|
||||||
|
|
|
@ -1,130 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver.throttle;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER;
|
|
||||||
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT;
|
|
||||||
import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
@Category(SmallTests.class)
|
|
||||||
public class TestStoreHotnessProtector {
|
|
||||||
|
|
||||||
@ClassRule public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestStoreHotnessProtector.class);
|
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
|
||||||
public void testPreparePutCounter() throws Exception {
|
|
||||||
|
|
||||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0);
|
|
||||||
conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10);
|
|
||||||
conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
|
|
||||||
Region mockRegion = mock(Region.class);
|
|
||||||
StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf);
|
|
||||||
|
|
||||||
Store mockStore1 = mock(Store.class);
|
|
||||||
RegionInfo mockRegionInfo = mock(RegionInfo.class);
|
|
||||||
byte[] family = "testF1".getBytes();
|
|
||||||
|
|
||||||
when(mockRegion.getStore(family)).thenReturn(mockStore1);
|
|
||||||
when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
|
|
||||||
when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1");
|
|
||||||
|
|
||||||
when(mockStore1.getCurrentParallelPutCount()).thenReturn(1);
|
|
||||||
when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1");
|
|
||||||
|
|
||||||
final Map<byte[], List<Cell>> familyMaps = new HashMap<>();
|
|
||||||
familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class)));
|
|
||||||
|
|
||||||
final AtomicReference<Exception> exception = new AtomicReference<>();
|
|
||||||
|
|
||||||
// PreparePutCounter not access limit
|
|
||||||
|
|
||||||
int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf
|
|
||||||
.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3);
|
|
||||||
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
|
|
||||||
|
|
||||||
for (int i = 0; i < threadCount; i++) {
|
|
||||||
executorService.execute(() -> {
|
|
||||||
try {
|
|
||||||
storeHotnessProtector.start(familyMaps);
|
|
||||||
} catch (RegionTooBusyException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
exception.set(e);
|
|
||||||
} finally {
|
|
||||||
countDownLatch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
countDownLatch.await(60, TimeUnit.SECONDS);
|
|
||||||
//no exception
|
|
||||||
Assert.assertEquals(exception.get(), null);
|
|
||||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
|
|
||||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
|
|
||||||
threadCount);
|
|
||||||
|
|
||||||
// access limit
|
|
||||||
|
|
||||||
try {
|
|
||||||
storeHotnessProtector.start(familyMaps);
|
|
||||||
} catch (RegionTooBusyException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
exception.set(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class);
|
|
||||||
|
|
||||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1);
|
|
||||||
// when access limit, counter will not changed.
|
|
||||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
|
|
||||||
threadCount + 1);
|
|
||||||
|
|
||||||
storeHotnessProtector.finish(familyMaps);
|
|
||||||
Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(),
|
|
||||||
threadCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue