HBASE-25212 [branch-2] Optionally abort requests in progress after deciding a region should close (#2575)
If hbase.regionserver.close.wait.abort is set to true, interrupt RPC handler threads holding the region close lock. Until requests in progress can be aborted, wait on the region close lock for a configurable interval (specified by hbase.regionserver.close.wait.time.ms, default 60000 (1 minute)). If we have failed to acquire the close lock after this interval elapses, if allowed (also specified by hbase.regionserver.close.wait.abort), abort the regionserver. We will attempt to interrupt any running handlers every hbase.regionserver.close.wait.interval.ms (default 10000 (10 seconds)) until either the close lock is acquired or we reach the maximum wait time. Define a subset of region operations as interruptible. Track threads holding the close lock transiting those operations. Set the thread interrupt status of tracked threads when trying to close the region. Use the thread interrupt status where safe to break out of request processing. Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Reid Chan <reidchan@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
729518af9d
commit
1cc39ffd59
|
@ -683,7 +683,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Last flush time for each Store. Useful when we are flushing for each column
|
||||
private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
|
||||
|
||||
final RegionServerServices rsServices;
|
||||
protected RegionServerServices rsServices;
|
||||
private RegionServerAccounting rsAccounting;
|
||||
private long flushCheckInterval;
|
||||
// flushPerChanges is to prevent too many changes in memstore
|
||||
|
@ -691,6 +691,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
private long blockingMemStoreSize;
|
||||
// Used to guard closes
|
||||
final ReentrantReadWriteLock lock;
|
||||
// Used to track interruptible holders of the region lock. Currently that is only RPC handler
|
||||
// threads. Boolean value in map determines if lock holder can be interrupted, normally true,
|
||||
// but may be false when thread is transiting a critical section.
|
||||
final ConcurrentHashMap<Thread, Boolean> regionLockHolders;
|
||||
|
||||
// Stop updates lock
|
||||
private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
|
||||
|
@ -783,6 +787,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR;
|
||||
this.lock = new ReentrantReadWriteLock(conf.getBoolean(FAIR_REENTRANT_CLOSE_LOCK,
|
||||
DEFAULT_FAIR_REENTRANT_CLOSE_LOCK));
|
||||
this.regionLockHolders = new ConcurrentHashMap<>();
|
||||
this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
|
||||
DEFAULT_CACHE_FLUSH_INTERVAL);
|
||||
this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
|
||||
|
@ -1164,7 +1169,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
LOG.info("Setting FlushNonSloppyStoresFirstPolicy for the region=" + this);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||
throw throwOnInterrupt(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} finally {
|
||||
|
@ -1563,6 +1568,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
*/
|
||||
public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
|
||||
|
||||
public static final String CLOSE_WAIT_ABORT = "hbase.regionserver.close.wait.abort";
|
||||
public static final boolean DEFAULT_CLOSE_WAIT_ABORT = false;
|
||||
public static final String CLOSE_WAIT_TIME = "hbase.regionserver.close.wait.time.ms";
|
||||
public static final long DEFAULT_CLOSE_WAIT_TIME = 60000; // 1 minute
|
||||
public static final String CLOSE_WAIT_INTERVAL = "hbase.regionserver.close.wait.interval.ms";
|
||||
public static final long DEFAULT_CLOSE_WAIT_INTERVAL = 10000; // 10 seconds
|
||||
|
||||
/**
|
||||
* Close down this HRegion. Flush the cache unless abort parameter is true,
|
||||
* Shut down each HStore, don't service any more calls.
|
||||
|
@ -1658,22 +1670,103 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
if (timeoutForWriteLock == null
|
||||
|| timeoutForWriteLock == Long.MAX_VALUE) {
|
||||
// block waiting for the lock for closing
|
||||
lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
|
||||
} else {
|
||||
try {
|
||||
boolean succeed = lock.writeLock().tryLock(timeoutForWriteLock, TimeUnit.SECONDS);
|
||||
if (!succeed) {
|
||||
throw new IOException("Failed to get write lock when closing region");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}
|
||||
// Set the closing flag
|
||||
// From this point new arrivals at the region lock will get NSRE.
|
||||
|
||||
this.closing.set(true);
|
||||
LOG.info("Closing region {}", this);
|
||||
|
||||
// Acquire the close lock
|
||||
|
||||
// The configuration parameter CLOSE_WAIT_ABORT is overloaded to enable both
|
||||
// the new regionserver abort condition and interrupts for running requests.
|
||||
// If CLOSE_WAIT_ABORT is not enabled there is no change from earlier behavior,
|
||||
// we will not attempt to interrupt threads servicing requests nor crash out
|
||||
// the regionserver if something remains stubborn.
|
||||
|
||||
final boolean canAbort = conf.getBoolean(CLOSE_WAIT_ABORT, DEFAULT_CLOSE_WAIT_ABORT);
|
||||
boolean useTimedWait = false;
|
||||
if (timeoutForWriteLock != null && timeoutForWriteLock != Long.MAX_VALUE) {
|
||||
// convert legacy use of timeoutForWriteLock in seconds to new use in millis
|
||||
timeoutForWriteLock = TimeUnit.SECONDS.toMillis(timeoutForWriteLock);
|
||||
useTimedWait = true;
|
||||
} else if (canAbort) {
|
||||
timeoutForWriteLock = conf.getLong(CLOSE_WAIT_TIME, DEFAULT_CLOSE_WAIT_TIME);
|
||||
useTimedWait = true;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug((useTimedWait ? "Time limited wait" : "Waiting without time limit") +
|
||||
" for close lock on " + this);
|
||||
}
|
||||
final long closeWaitInterval = conf.getLong(CLOSE_WAIT_INTERVAL, DEFAULT_CLOSE_WAIT_INTERVAL);
|
||||
long elapsedWaitTime = 0;
|
||||
if (useTimedWait) {
|
||||
// Sanity check configuration
|
||||
long remainingWaitTime = timeoutForWriteLock;
|
||||
if (remainingWaitTime < closeWaitInterval) {
|
||||
LOG.warn("Time limit for close wait of " + timeoutForWriteLock +
|
||||
" ms is less than the configured lock acquisition wait interval " +
|
||||
closeWaitInterval + " ms, using wait interval as time limit");
|
||||
remainingWaitTime = closeWaitInterval;
|
||||
}
|
||||
boolean acquired = false;
|
||||
do {
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
try {
|
||||
acquired = lock.writeLock().tryLock(Math.min(remainingWaitTime, closeWaitInterval),
|
||||
TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// Interrupted waiting for close lock. More likely the server is shutting down, not
|
||||
// normal operation, so aborting upon interrupt while waiting on this lock would not
|
||||
// provide much value. Throw an IOE (as IIOE) like we would in the case where we
|
||||
// fail to acquire the lock.
|
||||
String msg = "Interrupted while waiting for close lock on " + this;
|
||||
LOG.warn(msg, e);
|
||||
throw (InterruptedIOException) new InterruptedIOException(msg).initCause(e);
|
||||
}
|
||||
long elapsed = EnvironmentEdgeManager.currentTime() - start;
|
||||
elapsedWaitTime += elapsed;
|
||||
remainingWaitTime -= elapsed;
|
||||
if (canAbort && !acquired && remainingWaitTime > 0) {
|
||||
// Before we loop to wait again, interrupt all region operations that might
|
||||
// still be in progress, to encourage them to break out of waiting states or
|
||||
// inner loops, throw an exception to clients, and release the read lock via
|
||||
// endRegionOperation.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Interrupting region operations after waiting for close lock for " +
|
||||
elapsedWaitTime + " ms on " + this + ", " + remainingWaitTime +
|
||||
" ms remaining");
|
||||
}
|
||||
interruptRegionOperations();
|
||||
}
|
||||
} while (!acquired && remainingWaitTime > 0);
|
||||
|
||||
// If we fail to acquire the lock, trigger an abort if we can; otherwise throw an IOE
|
||||
// to let the caller know we could not proceed with the close.
|
||||
if (!acquired) {
|
||||
String msg = "Failed to acquire close lock on " + this + " after waiting " +
|
||||
elapsedWaitTime + " ms";
|
||||
LOG.error(msg);
|
||||
if (canAbort) {
|
||||
// If we failed to acquire the write lock, abort the server
|
||||
rsServices.abort(msg, null);
|
||||
}
|
||||
throw new IOException(msg);
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
lock.writeLock().lock();
|
||||
elapsedWaitTime = EnvironmentEdgeManager.currentTime() - start;
|
||||
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Acquired close lock on " + this + " after waiting " +
|
||||
elapsedWaitTime + " ms");
|
||||
}
|
||||
|
||||
status.setStatus("Disabling writes for close");
|
||||
try {
|
||||
if (this.isClosed()) {
|
||||
|
@ -1761,7 +1854,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
familyFiles.addAll(storeFiles.getSecond());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
||||
throw throwOnInterrupt(e);
|
||||
} catch (ExecutionException e) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof IOException) {
|
||||
|
@ -4514,6 +4607,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
MiniBatchOperationInProgress<Mutation> miniBatchOp = null;
|
||||
/** Keep track of the locks we hold so we can release them in finally clause */
|
||||
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size());
|
||||
|
||||
// Check for thread interrupt status in case we have been signaled from
|
||||
// #interruptRegionOperation.
|
||||
checkInterrupt();
|
||||
|
||||
try {
|
||||
// STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
|
||||
// locked rows
|
||||
|
@ -4527,20 +4625,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return;
|
||||
}
|
||||
|
||||
// Check for thread interrupt status in case we have been signaled from
|
||||
// #interruptRegionOperation. Do it before we take the lock and disable interrupts for
|
||||
// the WAL append.
|
||||
checkInterrupt();
|
||||
|
||||
lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
|
||||
locked = true;
|
||||
|
||||
// From this point until memstore update this operation should not be interrupted.
|
||||
disableInterrupts();
|
||||
|
||||
// STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp
|
||||
// We should record the timestamp only after we have acquired the rowLock,
|
||||
// otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer
|
||||
// timestamp
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
|
||||
|
||||
// STEP 3. Build WAL edit
|
||||
|
||||
List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);
|
||||
|
||||
// STEP 4. Append the WALEdits to WAL and sync.
|
||||
|
||||
for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
|
||||
Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
|
||||
walEdit = nonceKeyWALEditPair.getSecond();
|
||||
|
@ -4576,6 +4685,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
releaseRowLocks(acquiredRowLocks);
|
||||
|
||||
enableInterrupts();
|
||||
|
||||
final int finalLastIndexExclusive =
|
||||
miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size();
|
||||
final boolean finalSuccess = success;
|
||||
|
@ -6539,13 +6650,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
success = true;
|
||||
return result;
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Thread interrupted waiting for lock on row: {}, in region {}", rowKey,
|
||||
getRegionInfo().getRegionNameAsString());
|
||||
InterruptedIOException iie = new InterruptedIOException();
|
||||
iie.initCause(ie);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey,
|
||||
getRegionInfo().getRegionNameAsString());
|
||||
}
|
||||
TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock");
|
||||
Thread.currentThread().interrupt();
|
||||
throw iie;
|
||||
throw throwOnInterrupt(ie);
|
||||
} catch (Error error) {
|
||||
// The maximum lock count for read lock is 64K (hardcoded), when this maximum count
|
||||
// is reached, it will throw out an Error. This Error needs to be caught so it can
|
||||
|
@ -7237,6 +7347,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Scanning between column families and thus the scope is between cells
|
||||
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
|
||||
do {
|
||||
// Check for thread interrupt status in case we have been signaled from
|
||||
// #interruptRegionOperation.
|
||||
checkInterrupt();
|
||||
|
||||
// We want to maintain any progress that is made towards the limits while scanning across
|
||||
// different column families. To do this, we toggle the keep progress flag on during calls
|
||||
// to the StoreScanner to ensure that any progress made thus far is not wiped away.
|
||||
|
@ -7335,6 +7449,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
// Check for thread interrupt status in case we have been signaled from
|
||||
// #interruptRegionOperation.
|
||||
checkInterrupt();
|
||||
|
||||
// Let's see what we have in the storeHeap.
|
||||
Cell current = this.storeHeap.peek();
|
||||
|
||||
|
@ -7415,6 +7533,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return true;
|
||||
}
|
||||
|
||||
// Check for thread interrupt status in case we have been signaled from
|
||||
// #interruptRegionOperation.
|
||||
checkInterrupt();
|
||||
|
||||
Cell nextKv = this.storeHeap.peek();
|
||||
shouldStop = shouldStop(nextKv);
|
||||
// save that the row was empty before filters applied to it.
|
||||
|
@ -7574,6 +7696,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
Cell next;
|
||||
while ((next = this.storeHeap.peek()) != null &&
|
||||
CellUtil.matchingRows(next, curRowCell)) {
|
||||
// Check for thread interrupt status in case we have been signaled from
|
||||
// #interruptRegionOperation.
|
||||
checkInterrupt();
|
||||
this.storeHeap.next(MOCKED_LIST);
|
||||
}
|
||||
resetFilters();
|
||||
|
@ -8237,6 +8362,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// when it assigns the edit a sequencedid (A.K.A the mvcc write number).
|
||||
WriteEntry writeEntry = null;
|
||||
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
|
||||
|
||||
// Check for thread interrupt status in case we have been signaled from
|
||||
// #interruptRegionOperation.
|
||||
checkInterrupt();
|
||||
|
||||
try {
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -8252,9 +8382,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
prevRowLock = rowLock;
|
||||
}
|
||||
}
|
||||
|
||||
// Check for thread interrupt status in case we have been signaled from
|
||||
// #interruptRegionOperation. Do it before we take the lock and disable interrupts for
|
||||
// the WAL append.
|
||||
checkInterrupt();
|
||||
|
||||
// STEP 3. Region lock
|
||||
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
|
||||
locked = true;
|
||||
|
||||
// From this point until memstore update this operation should not be interrupted.
|
||||
disableInterrupts();
|
||||
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
// STEP 4. Let the processor scan the rows, generate mutations and add waledits
|
||||
doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
|
||||
|
@ -8320,6 +8460,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
// release locks if some were acquired but another timed out
|
||||
releaseRowLocks(acquiredRowLocks);
|
||||
|
||||
enableInterrupts();
|
||||
}
|
||||
|
||||
// 12. Run post-process hook
|
||||
|
@ -8382,6 +8524,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
rowProcessorExecutor.execute(task);
|
||||
try {
|
||||
task.get(timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
throw throwOnInterrupt(ie);
|
||||
} catch (TimeoutException te) {
|
||||
String row = processor.getRowsToLock().isEmpty() ? "" :
|
||||
" on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
|
||||
|
@ -8477,11 +8621,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return writeEntry;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @return Sorted list of <code>cells</code> using <code>comparator</code>
|
||||
*/
|
||||
|
@ -8520,7 +8659,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
(2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
|
||||
(3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL,
|
||||
// compactionsFailed
|
||||
(2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints
|
||||
(3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints, regionLockHolders
|
||||
WriteState.HEAP_SIZE + // writestate
|
||||
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
|
||||
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
|
||||
|
@ -8701,12 +8840,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
@Override
|
||||
public void startRegionOperation(Operation op) throws IOException {
|
||||
boolean isInterruptableOp = false;
|
||||
switch (op) {
|
||||
case GET: // read operations
|
||||
case GET: // interruptible read operations
|
||||
case SCAN:
|
||||
isInterruptableOp = true;
|
||||
checkReadsEnabled();
|
||||
break;
|
||||
default:
|
||||
case INCREMENT: // interruptible write operations
|
||||
case APPEND:
|
||||
case PUT:
|
||||
case DELETE:
|
||||
case BATCH_MUTATE:
|
||||
case CHECK_AND_MUTATE:
|
||||
isInterruptableOp = true;
|
||||
break;
|
||||
default: // all others
|
||||
break;
|
||||
}
|
||||
if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
|
||||
|
@ -8719,6 +8868,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
|
||||
}
|
||||
lock(lock.readLock());
|
||||
// Update regionLockHolders ONLY for any startRegionOperation call that is invoked from
|
||||
// an RPC handler
|
||||
Thread thisThread = Thread.currentThread();
|
||||
if (isInterruptableOp) {
|
||||
regionLockHolders.put(thisThread, true);
|
||||
}
|
||||
if (this.closed.get()) {
|
||||
lock.readLock().unlock();
|
||||
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
|
||||
|
@ -8733,6 +8888,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
coprocessorHost.postStartRegionOperation(op);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (isInterruptableOp) {
|
||||
// would be harmless to remove what we didn't add but we know by 'isInterruptableOp'
|
||||
// if we added this thread to regionLockHolders
|
||||
regionLockHolders.remove(thisThread);
|
||||
}
|
||||
lock.readLock().unlock();
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
@ -8748,6 +8908,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
if (operation == Operation.SNAPSHOT) {
|
||||
stores.values().forEach(HStore::postSnapshotOperation);
|
||||
}
|
||||
Thread thisThread = Thread.currentThread();
|
||||
regionLockHolders.remove(thisThread);
|
||||
lock.readLock().unlock();
|
||||
if (coprocessorHost != null) {
|
||||
coprocessorHost.postCloseRegionOperation(operation);
|
||||
|
@ -8763,8 +8925,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* @throws RegionTooBusyException if failed to get the lock in time
|
||||
* @throws InterruptedIOException if interrupted while waiting for a lock
|
||||
*/
|
||||
private void startBulkRegionOperation(boolean writeLockNeeded)
|
||||
throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
|
||||
private void startBulkRegionOperation(boolean writeLockNeeded) throws IOException {
|
||||
if (this.closing.get()) {
|
||||
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
|
||||
}
|
||||
|
@ -8775,6 +8936,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
else lock.readLock().unlock();
|
||||
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
|
||||
}
|
||||
regionLockHolders.put(Thread.currentThread(), true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -8782,6 +8944,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* to the try block of #startRegionOperation
|
||||
*/
|
||||
private void closeBulkRegionOperation(){
|
||||
regionLockHolders.remove(Thread.currentThread());
|
||||
if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
|
||||
else lock.readLock().unlock();
|
||||
}
|
||||
|
@ -8812,7 +8975,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
dataInMemoryWithoutWAL.add(mutationSize);
|
||||
}
|
||||
|
||||
private void lock(final Lock lock) throws RegionTooBusyException, InterruptedIOException {
|
||||
private void lock(final Lock lock) throws IOException {
|
||||
lock(lock, 1);
|
||||
}
|
||||
|
||||
|
@ -8821,8 +8984,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* if failed to get the lock in time. Throw InterruptedIOException
|
||||
* if interrupted while waiting for the lock.
|
||||
*/
|
||||
private void lock(final Lock lock, final int multiplier)
|
||||
throws RegionTooBusyException, InterruptedIOException {
|
||||
private void lock(final Lock lock, final int multiplier) throws IOException {
|
||||
try {
|
||||
final long waitTime = Math.min(maxBusyWaitDuration,
|
||||
busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
|
||||
|
@ -8840,10 +9002,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
throw rtbe;
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Interrupted while waiting for a lock in region {}", this);
|
||||
InterruptedIOException iie = new InterruptedIOException();
|
||||
iie.initCause(ie);
|
||||
throw iie;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Interrupted while waiting for a lock in region {}", this);
|
||||
}
|
||||
throw throwOnInterrupt(ie);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8971,6 +9133,67 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return getReadPoint(IsolationLevel.READ_COMMITTED);
|
||||
}
|
||||
|
||||
/**
|
||||
* If a handler thread is eligible for interrupt, make it ineligible. Should be paired
|
||||
* with {{@link #enableInterrupts()}.
|
||||
*/
|
||||
protected void disableInterrupts() {
|
||||
regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> false);
|
||||
}
|
||||
|
||||
/**
|
||||
* If a handler thread was made ineligible for interrupt via {{@link #disableInterrupts()},
|
||||
* make it eligible again. No-op if interrupts are already enabled.
|
||||
*/
|
||||
protected void enableInterrupts() {
|
||||
regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Interrupt any region options that have acquired the region lock via
|
||||
* {@link #startRegionOperation(org.apache.hadoop.hbase.regionserver.Region.Operation)},
|
||||
* or {@link #startBulkRegionOperation(boolean)}.
|
||||
*/
|
||||
private void interruptRegionOperations() {
|
||||
for (Map.Entry<Thread, Boolean> entry: regionLockHolders.entrySet()) {
|
||||
// An entry in this map will have a boolean value indicating if it is currently
|
||||
// eligible for interrupt; if so, we should interrupt it.
|
||||
if (entry.getValue().booleanValue()) {
|
||||
entry.getKey().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check thread interrupt status and throw an exception if interrupted.
|
||||
* @throws NotServingRegionException if region is closing
|
||||
* @throws InterruptedIOException if interrupted but region is not closing
|
||||
*/
|
||||
// Package scope for tests
|
||||
void checkInterrupt() throws NotServingRegionException, InterruptedIOException {
|
||||
if (Thread.interrupted()) {
|
||||
if (this.closing.get()) {
|
||||
throw new NotServingRegionException(
|
||||
getRegionInfo().getRegionNameAsString() + " is closing");
|
||||
}
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throw the correct exception upon interrupt
|
||||
* @param t cause
|
||||
*/
|
||||
// Package scope for tests
|
||||
IOException throwOnInterrupt(Throwable t) {
|
||||
if (this.closing.get()) {
|
||||
return (NotServingRegionException) new NotServingRegionException(
|
||||
getRegionInfo().getRegionNameAsString() + " is closing")
|
||||
.initCause(t);
|
||||
}
|
||||
return (InterruptedIOException) new InterruptedIOException().initCause(t);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
@ -196,7 +196,8 @@ public interface Region extends ConfigurationObserver {
|
|||
*/
|
||||
enum Operation {
|
||||
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
|
||||
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH
|
||||
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH,
|
||||
CHECK_AND_MUTATE
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -2096,14 +2096,29 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
/**
|
||||
* Create an HRegion that writes to the local tmp dirs with specified wal
|
||||
* @param info regioninfo
|
||||
* @param conf configuration
|
||||
* @param desc table descriptor
|
||||
* @param wal wal for this region.
|
||||
* @return created hregion
|
||||
* @throws IOException
|
||||
*/
|
||||
public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc, WAL wal)
|
||||
throws IOException {
|
||||
return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
|
||||
public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
|
||||
WAL wal) throws IOException {
|
||||
return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an HRegion that writes to the local tmp dirs with specified wal
|
||||
* @param info regioninfo
|
||||
* @param info configuration
|
||||
* @param desc table descriptor
|
||||
* @param wal wal for this region.
|
||||
* @return created hregion
|
||||
* @throws IOException
|
||||
*/
|
||||
public HRegion createLocalHRegion(HRegionInfo info, Configuration conf, HTableDescriptor desc,
|
||||
WAL wal) throws IOException {
|
||||
return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2127,9 +2142,8 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
|
||||
String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
|
||||
WAL wal, byte[]... families) throws IOException {
|
||||
return this
|
||||
.createLocalHRegion(TableName.valueOf(tableName), startKey, stopKey, isReadOnly, durability,
|
||||
wal, families);
|
||||
return createLocalHRegion(TableName.valueOf(tableName), startKey, stopKey, conf, isReadOnly,
|
||||
durability, wal, families);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2143,13 +2157,14 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
* @throws IOException
|
||||
*/
|
||||
public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
|
||||
boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
|
||||
return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, isReadOnly,
|
||||
Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
|
||||
throws IOException {
|
||||
return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, conf, isReadOnly,
|
||||
durability, wal, null, families);
|
||||
}
|
||||
|
||||
public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
|
||||
byte[] stopKey,
|
||||
byte[] stopKey, Configuration conf,
|
||||
boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore,
|
||||
byte[]... families)
|
||||
throws IOException {
|
||||
|
@ -2171,7 +2186,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
}
|
||||
htd.setDurability(durability);
|
||||
HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
|
||||
return createLocalHRegion(info, htd, wal);
|
||||
return createLocalHRegion(info, conf, htd, wal);
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
@ -181,7 +181,7 @@ public class TestCacheOnWriteInSchema {
|
|||
RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
walFactory = new WALFactory(conf, id);
|
||||
|
||||
region = TEST_UTIL.createLocalHRegion(info, htd, walFactory.getWAL(info));
|
||||
region = TEST_UTIL.createLocalHRegion(info, conf, htd, walFactory.getWAL(info));
|
||||
region.setBlockCache(BlockCacheFactory.createBlockCache(conf));
|
||||
store = new HStore(region, hcd, conf, false);
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ public class TestFailedAppendAndSync {
|
|||
boolean threwOnAppend = false;
|
||||
boolean threwOnBoth = false;
|
||||
|
||||
HRegion region = initHRegion(tableName, null, null, dodgyWAL);
|
||||
HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
|
||||
try {
|
||||
// Get some random bytes.
|
||||
byte[] value = Bytes.toBytes(getName());
|
||||
|
@ -311,11 +311,11 @@ public class TestFailedAppendAndSync {
|
|||
* @return A region on which you must call
|
||||
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
|
||||
*/
|
||||
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
|
||||
throws IOException {
|
||||
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
|
||||
Configuration conf, WAL wal) throws IOException {
|
||||
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
|
||||
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
|
||||
wal, COLUMN_FAMILY_BYTES);
|
||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false,
|
||||
Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
|
@ -139,6 +141,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
|||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
|
||||
import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||
|
@ -358,7 +361,7 @@ public class TestHRegion {
|
|||
FileSystem fs = FileSystem.get(CONF);
|
||||
Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
|
||||
MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
|
||||
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
|
||||
region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, faultyLog,
|
||||
COLUMN_FAMILY_BYTES);
|
||||
|
||||
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
|
||||
|
@ -406,7 +409,7 @@ public class TestHRegion {
|
|||
Path rootDir = new Path(dir + testName);
|
||||
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
|
||||
hLog.init();
|
||||
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
|
||||
HRegion region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, hLog,
|
||||
COLUMN_FAMILY_BYTES);
|
||||
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
|
||||
assertEquals(0, region.getMemStoreDataSize());
|
||||
|
@ -505,7 +508,7 @@ public class TestHRegion {
|
|||
HRegion region = null;
|
||||
try {
|
||||
// Initialize region
|
||||
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
|
||||
region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, wal,
|
||||
COLUMN_FAMILY_BYTES);
|
||||
long size = region.getMemStoreDataSize();
|
||||
Assert.assertEquals(0, size);
|
||||
|
@ -570,7 +573,7 @@ public class TestHRegion {
|
|||
HRegion region = null;
|
||||
try {
|
||||
// Initialize region
|
||||
region = initHRegion(tableName, null, null, false,
|
||||
region = initHRegion(tableName, null, null, CONF, false,
|
||||
Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
|
||||
long size = region.getMemStoreDataSize();
|
||||
Assert.assertEquals(0, size);
|
||||
|
@ -1060,7 +1063,7 @@ public class TestHRegion {
|
|||
final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
|
||||
|
||||
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
|
||||
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
|
||||
HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family);
|
||||
try {
|
||||
Path regiondir = region.getRegionFileSystem().getRegionDir();
|
||||
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||
|
@ -1265,7 +1268,7 @@ public class TestHRegion {
|
|||
CommonFSUtils.getRootDir(walConf), method, walConf);
|
||||
wal.init();
|
||||
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
|
||||
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
|
||||
HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family);
|
||||
int i = 0;
|
||||
Put put = new Put(Bytes.toBytes(i));
|
||||
put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
|
||||
|
@ -1296,7 +1299,7 @@ public class TestHRegion {
|
|||
method, walConf);
|
||||
wal.init();
|
||||
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
|
||||
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
|
||||
HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family);
|
||||
region.put(put);
|
||||
// 3. Test case where ABORT_FLUSH will throw exception.
|
||||
// Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
|
||||
|
@ -3245,7 +3248,7 @@ public class TestHRegion {
|
|||
hLog.init();
|
||||
// This chunk creation is done throughout the code base. Do we want to move it into core?
|
||||
// It is missing from this test. W/o it we NPE.
|
||||
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
|
||||
region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, hLog,
|
||||
COLUMN_FAMILY_BYTES);
|
||||
|
||||
Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
|
||||
|
@ -3505,7 +3508,7 @@ public class TestHRegion {
|
|||
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
|
||||
Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
|
||||
final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
|
||||
this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
|
||||
this.region = TEST_UTIL.createLocalHRegion(info, CONF, htd, wal);
|
||||
|
||||
// Put 4 version to memstore
|
||||
long ts = 0;
|
||||
|
@ -5387,7 +5390,7 @@ public class TestHRegion {
|
|||
final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString());
|
||||
final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
|
||||
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
|
||||
HConstants.EMPTY_END_ROW, false, tableDurability, wal,
|
||||
HConstants.EMPTY_END_ROW, CONF, false, tableDurability, wal,
|
||||
new byte[][] { family });
|
||||
|
||||
Put put = new Put(Bytes.toBytes("r1"));
|
||||
|
@ -5755,7 +5758,7 @@ public class TestHRegion {
|
|||
Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
|
||||
HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
|
||||
final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
|
||||
return initHRegion(tableName, startKey, stopKey, isReadOnly,
|
||||
return initHRegion(tableName, startKey, stopKey, conf, isReadOnly,
|
||||
Durability.SYNC_WAL, wal, families);
|
||||
}
|
||||
|
||||
|
@ -5764,11 +5767,12 @@ public class TestHRegion {
|
|||
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
|
||||
*/
|
||||
public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
|
||||
boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
|
||||
Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
|
||||
byte[]... families) throws IOException {
|
||||
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
|
||||
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
|
||||
isReadOnly, durability, wal, families);
|
||||
conf, isReadOnly, durability, wal, families);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -6035,9 +6039,9 @@ public class TestHRegion {
|
|||
byte[] col1 = Bytes.toBytes("col1");
|
||||
byte[] col2 = Bytes.toBytes("col2");
|
||||
long ts = 1;
|
||||
HBaseConfiguration config = new HBaseConfiguration();
|
||||
config.setInt("test.block.size", 1);
|
||||
this.region = initHRegion(tableName, method, config, families);
|
||||
Configuration conf = new Configuration(CONF);
|
||||
conf.setInt("test.block.size", 1);
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
|
||||
KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
|
||||
KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
|
||||
|
@ -6115,7 +6119,7 @@ public class TestHRegion {
|
|||
byte[][] families = { cf1, cf2, cf3 };
|
||||
byte[] col = Bytes.toBytes("C");
|
||||
long ts = 1;
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
Configuration conf = new Configuration(CONF);
|
||||
// disable compactions in this test.
|
||||
conf.setInt("hbase.hstore.compactionThreshold", 10000);
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
|
@ -6277,7 +6281,7 @@ public class TestHRegion {
|
|||
byte[][] families = { cf1, cf2, cf3, cf4 };
|
||||
byte[] col = Bytes.toBytes("C");
|
||||
long ts = 1;
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
Configuration conf = new Configuration(CONF);
|
||||
// disable compactions in this test.
|
||||
conf.setInt("hbase.hstore.compactionThreshold", 10000);
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
|
@ -6343,7 +6347,7 @@ public class TestHRegion {
|
|||
byte[] cf1 = Bytes.toBytes("CF1");
|
||||
byte[][] families = {cf1};
|
||||
byte[] col = Bytes.toBytes("C");
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
Configuration conf = new Configuration(CONF);
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
// setup with one storefile and one memstore, to create scanner and get an earlier readPt
|
||||
Put put = new Put(Bytes.toBytes("19998"));
|
||||
|
@ -6392,8 +6396,7 @@ public class TestHRegion {
|
|||
byte[] cf1 = Bytes.toBytes("CF1");
|
||||
byte[][] families = { cf1 };
|
||||
byte[] col = Bytes.toBytes("C");
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
// setup with one storefile and one memstore, to create scanner and get an earlier readPt
|
||||
Put put = new Put(Bytes.toBytes("19996"));
|
||||
put.addColumn(cf1, col, Bytes.toBytes("val"));
|
||||
|
@ -6445,8 +6448,7 @@ public class TestHRegion {
|
|||
byte[][] families = { cf1 };
|
||||
byte[] col = Bytes.toBytes("C");
|
||||
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
this.region = initHRegion(tableName, method, conf, families);
|
||||
this.region = initHRegion(tableName, method, CONF, families);
|
||||
|
||||
Put put = new Put(Bytes.toBytes("199996"));
|
||||
put.addColumn(cf1, col, Bytes.toBytes("val"));
|
||||
|
@ -7345,4 +7347,226 @@ public class TestHRegion {
|
|||
return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseNoInterrupt() throws Exception {
|
||||
byte[] cf1 = Bytes.toBytes("CF1");
|
||||
byte[][] families = { cf1 };
|
||||
final int SLEEP_TIME = 10 * 1000;
|
||||
|
||||
Configuration conf = new Configuration(CONF);
|
||||
// Disable close thread interrupt and server abort behavior
|
||||
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, false);
|
||||
conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000);
|
||||
region = initHRegion(tableName, method, conf, families);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicBoolean holderInterrupted = new AtomicBoolean();
|
||||
Thread holder = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("Starting region operation holder");
|
||||
region.startRegionOperation(Operation.SCAN);
|
||||
latch.countDown();
|
||||
try {
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted");
|
||||
holderInterrupted.set(true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
try {
|
||||
region.closeRegionOperation();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
LOG.info("Stopped region operation holder");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
holder.start();
|
||||
latch.await();
|
||||
region.close();
|
||||
region = null;
|
||||
holder.join();
|
||||
|
||||
assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseInterrupt() throws Exception {
|
||||
byte[] cf1 = Bytes.toBytes("CF1");
|
||||
byte[][] families = { cf1 };
|
||||
final int SLEEP_TIME = 10 * 1000;
|
||||
|
||||
Configuration conf = new Configuration(CONF);
|
||||
// Enable close thread interrupt and server abort behavior
|
||||
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
|
||||
// Speed up the unit test, no need to wait default 10 seconds.
|
||||
conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000);
|
||||
region = initHRegion(tableName, method, conf, families);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicBoolean holderInterrupted = new AtomicBoolean();
|
||||
Thread holder = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("Starting region operation holder");
|
||||
region.startRegionOperation(Operation.SCAN);
|
||||
latch.countDown();
|
||||
try {
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted");
|
||||
holderInterrupted.set(true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
try {
|
||||
region.closeRegionOperation();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
LOG.info("Stopped region operation holder");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
holder.start();
|
||||
latch.await();
|
||||
region.close();
|
||||
region = null;
|
||||
holder.join();
|
||||
|
||||
assertTrue("Region lock holder was not interrupted", holderInterrupted.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseAbort() throws Exception {
|
||||
byte[] cf1 = Bytes.toBytes("CF1");
|
||||
byte[][] families = { cf1 };
|
||||
final int SLEEP_TIME = 10 * 1000;
|
||||
|
||||
Configuration conf = new Configuration(CONF);
|
||||
// Enable close thread interrupt and server abort behavior.
|
||||
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
|
||||
// Set the abort interval to a fraction of sleep time so we are guaranteed to be aborted.
|
||||
conf.setInt(HRegion.CLOSE_WAIT_TIME, SLEEP_TIME / 2);
|
||||
// Set the wait interval to a fraction of sleep time so we are guaranteed to be interrupted.
|
||||
conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, SLEEP_TIME / 4);
|
||||
region = initHRegion(tableName, method, conf, families);
|
||||
RegionServerServices rsServices = mock(RegionServerServices.class);
|
||||
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost", 1000, 1000));
|
||||
region.rsServices = rsServices;
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Thread holder = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("Starting region operation holder");
|
||||
region.startRegionOperation(Operation.SCAN);
|
||||
latch.countDown();
|
||||
// Hold the lock for SLEEP_TIME seconds no matter how many times we are interrupted.
|
||||
int timeRemaining = SLEEP_TIME;
|
||||
while (timeRemaining > 0) {
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
try {
|
||||
Thread.sleep(timeRemaining);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted");
|
||||
}
|
||||
long end = EnvironmentEdgeManager.currentTime();
|
||||
timeRemaining -= end - start;
|
||||
if (timeRemaining < 0) {
|
||||
timeRemaining = 0;
|
||||
}
|
||||
if (timeRemaining > 0) {
|
||||
LOG.info("Sleeping again, remaining time " + timeRemaining + " ms");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
try {
|
||||
region.closeRegionOperation();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
LOG.info("Stopped region operation holder");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
holder.start();
|
||||
latch.await();
|
||||
try {
|
||||
region.close();
|
||||
} catch (IOException e) {
|
||||
LOG.info("Caught expected exception", e);
|
||||
}
|
||||
region = null;
|
||||
holder.join();
|
||||
|
||||
// Verify the region tried to abort the server
|
||||
verify(rsServices, atLeast(1)).abort(anyString(),any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptProtection() throws Exception {
|
||||
byte[] cf1 = Bytes.toBytes("CF1");
|
||||
byte[][] families = { cf1 };
|
||||
final int SLEEP_TIME = 10 * 1000;
|
||||
|
||||
Configuration conf = new Configuration(CONF);
|
||||
// Enable close thread interrupt and server abort behavior.
|
||||
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
|
||||
conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000);
|
||||
region = initHRegion(tableName, method, conf, families);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicBoolean holderInterrupted = new AtomicBoolean();
|
||||
Thread holder = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.info("Starting region operation holder");
|
||||
region.startRegionOperation(Operation.SCAN);
|
||||
LOG.info("Protecting against interrupts");
|
||||
region.disableInterrupts();
|
||||
try {
|
||||
latch.countDown();
|
||||
try {
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Interrupted");
|
||||
holderInterrupted.set(true);
|
||||
}
|
||||
} finally {
|
||||
region.enableInterrupts();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
try {
|
||||
region.closeRegionOperation();
|
||||
} catch (IOException e) {
|
||||
}
|
||||
LOG.info("Stopped region operation holder");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
holder.start();
|
||||
latch.await();
|
||||
region.close();
|
||||
region = null;
|
||||
holder.join();
|
||||
|
||||
assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -58,7 +60,8 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion {
|
|||
*/
|
||||
@Override
|
||||
public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
|
||||
boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
|
||||
Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
|
||||
throws IOException {
|
||||
boolean[] inMemory = new boolean[families.length];
|
||||
for(int i = 0; i < inMemory.length; i++) {
|
||||
inMemory[i] = true;
|
||||
|
@ -66,7 +69,7 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion {
|
|||
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
|
||||
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||
return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey,
|
||||
isReadOnly, durability, wal, inMemory, families);
|
||||
conf, isReadOnly, durability, wal, inMemory, families);
|
||||
}
|
||||
|
||||
@Override int getTestCountForTestWritesWhileScanning() {
|
||||
|
|
|
@ -0,0 +1,362 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNameTestRule;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
public class TestRegionInterrupt {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionInterrupt.class);
|
||||
|
||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class);
|
||||
|
||||
static final byte[] FAMILY = Bytes.toBytes("info");
|
||||
|
||||
static long sleepTime;
|
||||
|
||||
@Rule
|
||||
public TableNameTestRule name = new TableNameTestRule();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||
conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class);
|
||||
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
|
||||
// Ensure the sleep interval is long enough for interrupts to occur.
|
||||
long waitInterval = conf.getLong(HRegion.CLOSE_WAIT_INTERVAL,
|
||||
HRegion.DEFAULT_CLOSE_WAIT_INTERVAL);
|
||||
sleepTime = waitInterval * 2;
|
||||
// Try to bound the running time of this unit if expected actions do not take place.
|
||||
conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
TEST_UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseInterruptScanning() throws Exception {
|
||||
final TableName tableName = name.getTableName();
|
||||
LOG.info("Creating table " + tableName);
|
||||
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
|
||||
// load some data
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
TEST_UTIL.loadTable(table, FAMILY);
|
||||
final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
|
||||
// scan the table in the background
|
||||
Thread scanner = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(FAMILY);
|
||||
scan.setFilter(new DelayingFilter());
|
||||
try {
|
||||
LOG.info("Starting scan");
|
||||
try (ResultScanner rs = table.getScanner(scan)) {
|
||||
Result r;
|
||||
do {
|
||||
r = rs.next();
|
||||
if (r != null) {
|
||||
LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow()));
|
||||
}
|
||||
} while (r != null);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Scanner caught exception", e);
|
||||
expectedExceptionCaught.set(true);
|
||||
} finally {
|
||||
LOG.info("Finished scan");
|
||||
}
|
||||
}
|
||||
});
|
||||
scanner.start();
|
||||
|
||||
// Wait for the filter to begin sleeping
|
||||
LOG.info("Waiting for scanner to start");
|
||||
Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return DelayingFilter.isSleeping();
|
||||
}
|
||||
});
|
||||
|
||||
// Offline the table, this will trigger closing
|
||||
LOG.info("Offlining table " + tableName);
|
||||
TEST_UTIL.getHBaseAdmin().disableTable(tableName);
|
||||
|
||||
// Wait for scanner termination
|
||||
scanner.join();
|
||||
|
||||
// When we get here the region has closed and the table is offline
|
||||
assertTrue("Region operations were not interrupted",
|
||||
InterruptInterceptingHRegion.wasInterrupted());
|
||||
assertTrue("Scanner did not catch expected exception", expectedExceptionCaught.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseInterruptMutation() throws Exception {
|
||||
final TableName tableName = name.getTableName();
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
// Create the test table
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor(FAMILY));
|
||||
htd.addCoprocessor(MutationDelayingCoprocessor.class.getName());
|
||||
LOG.info("Creating table " + tableName);
|
||||
admin.createTable(htd);
|
||||
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
|
||||
// Insert some data in the background
|
||||
LOG.info("Starting writes to table " + tableName);
|
||||
final int NUM_ROWS = 100;
|
||||
final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
|
||||
Thread inserter = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) {
|
||||
for (int i = 0; i < NUM_ROWS; i++) {
|
||||
LOG.info("Writing row " + i + " to " + tableName);
|
||||
byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i));
|
||||
Bytes.random(value);
|
||||
t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value));
|
||||
t.flush();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Inserter caught exception", e);
|
||||
expectedExceptionCaught.set(true);
|
||||
}
|
||||
}
|
||||
});
|
||||
inserter.start();
|
||||
|
||||
// Wait for delayed insertion to begin
|
||||
LOG.info("Waiting for mutations to start");
|
||||
Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return MutationDelayingCoprocessor.isSleeping();
|
||||
}
|
||||
});
|
||||
|
||||
// Offline the table, this will trigger closing
|
||||
LOG.info("Offlining table " + tableName);
|
||||
admin.disableTable(tableName);
|
||||
|
||||
// Wait for the inserter to finish
|
||||
inserter.join();
|
||||
|
||||
// When we get here the region has closed and the table is offline
|
||||
assertTrue("Region operations were not interrupted",
|
||||
InterruptInterceptingHRegion.wasInterrupted());
|
||||
assertTrue("Inserter did not catch expected exception", expectedExceptionCaught.get());
|
||||
|
||||
}
|
||||
|
||||
public static class InterruptInterceptingHRegion extends HRegion {
|
||||
|
||||
private static boolean interrupted = false;
|
||||
|
||||
public static boolean wasInterrupted() {
|
||||
return interrupted;
|
||||
}
|
||||
|
||||
public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs,
|
||||
Configuration conf, RegionInfo regionInfo, TableDescriptor htd,
|
||||
RegionServerServices rsServices) {
|
||||
super(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
|
||||
}
|
||||
|
||||
public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf,
|
||||
TableDescriptor htd, RegionServerServices rsServices) {
|
||||
super(fs, wal, conf, htd, rsServices);
|
||||
}
|
||||
|
||||
@Override
|
||||
void checkInterrupt() throws NotServingRegionException, InterruptedIOException {
|
||||
try {
|
||||
super.checkInterrupt();
|
||||
} catch (NotServingRegionException | InterruptedIOException e) {
|
||||
interrupted = true;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
IOException throwOnInterrupt(Throwable t) {
|
||||
interrupted = true;
|
||||
return super.throwOnInterrupt(t);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class DelayingFilter extends FilterBase {
|
||||
|
||||
static volatile boolean sleeping = false;
|
||||
|
||||
public static boolean isSleeping() {
|
||||
return sleeping;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReturnCode filterCell(Cell v) throws IOException {
|
||||
LOG.info("Starting sleep on " + v);
|
||||
sleeping = true;
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
// restore interrupt status so region scanner can handle it as expected
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.info("Interrupted during sleep on " + v);
|
||||
} finally {
|
||||
LOG.info("Done sleep on " + v);
|
||||
sleeping = false;
|
||||
}
|
||||
return ReturnCode.INCLUDE;
|
||||
}
|
||||
|
||||
public static DelayingFilter parseFrom(final byte [] pbBytes)
|
||||
throws DeserializationException {
|
||||
// Just return a new instance.
|
||||
return new DelayingFilter();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver {
|
||||
|
||||
static volatile boolean sleeping = false;
|
||||
|
||||
public static boolean isSleeping() {
|
||||
return sleeping;
|
||||
}
|
||||
|
||||
private void doSleep(Region.Operation op) {
|
||||
LOG.info("Starting sleep for " + op);
|
||||
sleeping = true;
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
// restore interrupt status so doMiniBatchMutation etc. can handle it as expected
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.info("Interrupted during " + op);
|
||||
} finally {
|
||||
LOG.info("Done");
|
||||
sleeping = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
|
||||
Durability durability) throws IOException {
|
||||
doSleep(Region.Operation.PUT);
|
||||
RegionObserver.super.prePut(c, put, edit, durability);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
|
||||
WALEdit edit, Durability durability) throws IOException {
|
||||
doSleep(Region.Operation.DELETE);
|
||||
RegionObserver.super.preDelete(c, delete, edit, durability);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
|
||||
throws IOException {
|
||||
doSleep(Region.Operation.APPEND);
|
||||
return RegionObserver.super.preAppend(c, append);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
|
||||
throws IOException {
|
||||
doSleep(Region.Operation.INCREMENT);
|
||||
return RegionObserver.super.preIncrement(c, increment);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -226,7 +226,7 @@ public class TestWALLockup {
|
|||
// There is no 'stop' once a logRoller is running.. it just dies.
|
||||
logRoller.start();
|
||||
// Now get a region and start adding in edits.
|
||||
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
|
||||
final HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
|
||||
byte [] bytes = Bytes.toBytes(getName());
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
|
@ -558,11 +558,11 @@ public class TestWALLockup {
|
|||
* @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
|
||||
* when done.
|
||||
*/
|
||||
private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
|
||||
throws IOException {
|
||||
private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
|
||||
Configuration conf, WAL wal) throws IOException {
|
||||
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
|
||||
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
|
||||
wal, COLUMN_FAMILY_BYTES);
|
||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false,
|
||||
Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -555,7 +555,7 @@ public abstract class AbstractTestFSWAL {
|
|||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
|
||||
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
|
||||
TEST_UTIL.createLocalHRegion(hri, CONF, htd, wal).close();
|
||||
RegionServerServices rsServices = mock(RegionServerServices.class);
|
||||
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
|
||||
when(rsServices.getConfiguration()).thenReturn(conf);
|
||||
|
|
|
@ -168,7 +168,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
|||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
|
||||
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||
final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
|
||||
final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log);
|
||||
ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||
|
||||
// do a regular write first because of memstore size calculation.
|
||||
|
|
|
@ -89,7 +89,7 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
|
|||
FileSystem fs = FileSystem.get(conf);
|
||||
Path rootDir = new Path(dir + getName());
|
||||
T wal = getWAL(fs, rootDir, getName(), conf);
|
||||
HRegion region = initHRegion(tableName, null, null, wal);
|
||||
HRegion region = initHRegion(tableName, null, null, conf, wal);
|
||||
try {
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
|
@ -114,7 +114,7 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
|
|||
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
|
||||
fs = FileSystem.get(conf);
|
||||
wal = getWAL(fs, rootDir, getName(), conf);
|
||||
region = initHRegion(tableName, null, null, wal);
|
||||
region = initHRegion(tableName, null, null, conf, wal);
|
||||
|
||||
try {
|
||||
resetSyncFlag(wal);
|
||||
|
@ -156,11 +156,11 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
|
|||
* @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
|
||||
* when done.
|
||||
*/
|
||||
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
|
||||
throws IOException {
|
||||
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
|
||||
Configuration conf, WAL wal) throws IOException {
|
||||
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
|
||||
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
|
||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.USE_DEFAULT,
|
||||
wal, COLUMN_FAMILY_BYTES);
|
||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false,
|
||||
Durability.USE_DEFAULT, wal, COLUMN_FAMILY_BYTES);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue