HBASE-25212 Optionally abort requests in progress after deciding a region should close (#2574)

If hbase.regionserver.close.wait.abort is set to true, interrupt RPC
handler threads holding the region close lock.

Until requests in progress can be aborted, wait on the region close lock for
a configurable interval (specified by hbase.regionserver.close.wait.time.ms,
default 60000 (1 minute)). If we have failed to acquire the close lock after
this interval elapses, if allowed (also specified by
hbase.regionserver.close.wait.abort), abort the regionserver.

We will attempt to interrupt any running handlers every
hbase.regionserver.close.wait.interval.ms (default 10000 (10 seconds)) until
either the close lock is acquired or we reach the maximum wait time.

Define a subset of region operations as interruptible. Track threads holding
the close lock transiting those operations. Set the thread interrupt status
of tracked threads when trying to close the region. Use the thread interrupt
status where safe to break out of request processing.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Reid Chan <reidchan@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Andrew Purtell 2020-11-03 15:20:27 -08:00 committed by GitHub
parent 1eceab69b5
commit c98e993b23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 912 additions and 96 deletions

View File

@ -688,7 +688,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Last flush time for each Store. Useful when we are flushing for each column
private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
final RegionServerServices rsServices;
protected RegionServerServices rsServices;
private RegionServerAccounting rsAccounting;
private long flushCheckInterval;
// flushPerChanges is to prevent too many changes in memstore
@ -696,6 +696,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private long blockingMemStoreSize;
// Used to guard closes
final ReentrantReadWriteLock lock;
// Used to track interruptible holders of the region lock. Currently that is only RPC handler
// threads. Boolean value in map determines if lock holder can be interrupted, normally true,
// but may be false when thread is transiting a critical section.
final ConcurrentHashMap<Thread, Boolean> regionLockHolders;
// Stop updates lock
private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
@ -788,6 +792,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR;
this.lock = new ReentrantReadWriteLock(conf.getBoolean(FAIR_REENTRANT_CLOSE_LOCK,
DEFAULT_FAIR_REENTRANT_CLOSE_LOCK));
this.regionLockHolders = new ConcurrentHashMap<>();
this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
DEFAULT_CACHE_FLUSH_INTERVAL);
this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
@ -1174,7 +1179,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.info("Setting FlushNonSloppyStoresFirstPolicy for the region=" + this);
}
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
throw throwOnInterrupt(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
@ -1578,6 +1583,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
public static final String CLOSE_WAIT_ABORT = "hbase.regionserver.close.wait.abort";
public static final boolean DEFAULT_CLOSE_WAIT_ABORT = true;
public static final String CLOSE_WAIT_TIME = "hbase.regionserver.close.wait.time.ms";
public static final long DEFAULT_CLOSE_WAIT_TIME = 60000; // 1 minute
public static final String CLOSE_WAIT_INTERVAL = "hbase.regionserver.close.wait.interval.ms";
public static final long DEFAULT_CLOSE_WAIT_INTERVAL = 10000; // 10 seconds
public Map<byte[], List<HStoreFile>> close(boolean abort) throws IOException {
return close(abort, false);
}
@ -1679,22 +1691,103 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (timeoutForWriteLock == null
|| timeoutForWriteLock == Long.MAX_VALUE) {
// block waiting for the lock for closing
lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
} else {
try {
boolean succeed = lock.writeLock().tryLock(timeoutForWriteLock, TimeUnit.SECONDS);
if (!succeed) {
throw new IOException("Failed to get write lock when closing region");
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}
// Set the closing flag
// From this point new arrivals at the region lock will get NSRE.
this.closing.set(true);
LOG.info("Closing region {}", this);
// Acquire the close lock
// The configuration parameter CLOSE_WAIT_ABORT is overloaded to enable both
// the new regionserver abort condition and interrupts for running requests.
// If CLOSE_WAIT_ABORT is not enabled there is no change from earlier behavior,
// we will not attempt to interrupt threads servicing requests nor crash out
// the regionserver if something remains stubborn.
final boolean canAbort = conf.getBoolean(CLOSE_WAIT_ABORT, DEFAULT_CLOSE_WAIT_ABORT);
boolean useTimedWait = false;
if (timeoutForWriteLock != null && timeoutForWriteLock != Long.MAX_VALUE) {
// convert legacy use of timeoutForWriteLock in seconds to new use in millis
timeoutForWriteLock = TimeUnit.SECONDS.toMillis(timeoutForWriteLock);
useTimedWait = true;
} else if (canAbort) {
timeoutForWriteLock = conf.getLong(CLOSE_WAIT_TIME, DEFAULT_CLOSE_WAIT_TIME);
useTimedWait = true;
}
if (LOG.isDebugEnabled()) {
LOG.debug((useTimedWait ? "Time limited wait" : "Waiting without time limit") +
" for close lock on " + this);
}
final long closeWaitInterval = conf.getLong(CLOSE_WAIT_INTERVAL, DEFAULT_CLOSE_WAIT_INTERVAL);
long elapsedWaitTime = 0;
if (useTimedWait) {
// Sanity check configuration
long remainingWaitTime = timeoutForWriteLock;
if (remainingWaitTime < closeWaitInterval) {
LOG.warn("Time limit for close wait of " + timeoutForWriteLock +
" ms is less than the configured lock acquisition wait interval " +
closeWaitInterval + " ms, using wait interval as time limit");
remainingWaitTime = closeWaitInterval;
}
boolean acquired = false;
do {
long start = EnvironmentEdgeManager.currentTime();
try {
acquired = lock.writeLock().tryLock(Math.min(remainingWaitTime, closeWaitInterval),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Interrupted waiting for close lock. More likely the server is shutting down, not
// normal operation, so aborting upon interrupt while waiting on this lock would not
// provide much value. Throw an IOE (as IIOE) like we would in the case where we
// fail to acquire the lock.
String msg = "Interrupted while waiting for close lock on " + this;
LOG.warn(msg, e);
throw (InterruptedIOException) new InterruptedIOException(msg).initCause(e);
}
long elapsed = EnvironmentEdgeManager.currentTime() - start;
elapsedWaitTime += elapsed;
remainingWaitTime -= elapsed;
if (canAbort && !acquired && remainingWaitTime > 0) {
// Before we loop to wait again, interrupt all region operations that might
// still be in progress, to encourage them to break out of waiting states or
// inner loops, throw an exception to clients, and release the read lock via
// endRegionOperation.
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupting region operations after waiting for close lock for " +
elapsedWaitTime + " ms on " + this + ", " + remainingWaitTime +
" ms remaining");
}
interruptRegionOperations();
}
} while (!acquired && remainingWaitTime > 0);
// If we fail to acquire the lock, trigger an abort if we can; otherwise throw an IOE
// to let the caller know we could not proceed with the close.
if (!acquired) {
String msg = "Failed to acquire close lock on " + this + " after waiting " +
elapsedWaitTime + " ms";
LOG.error(msg);
if (canAbort) {
// If we failed to acquire the write lock, abort the server
rsServices.abort(msg, null);
}
throw new IOException(msg);
}
} else {
long start = EnvironmentEdgeManager.currentTime();
lock.writeLock().lock();
elapsedWaitTime = EnvironmentEdgeManager.currentTime() - start;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Acquired close lock on " + this + " after waiting " +
elapsedWaitTime + " ms");
}
status.setStatus("Disabling writes for close");
try {
if (this.isClosed()) {
@ -1782,7 +1875,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
familyFiles.addAll(storeFiles.getSecond());
}
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
throw throwOnInterrupt(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
@ -4549,6 +4642,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MiniBatchOperationInProgress<Mutation> miniBatchOp = null;
/** Keep track of the locks we hold so we can release them in finally clause */
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size());
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
checkInterrupt();
try {
// STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
// locked rows
@ -4562,20 +4660,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return;
}
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation. Do it before we take the lock and disable interrupts for
// the WAL append.
checkInterrupt();
lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
locked = true;
// From this point until memstore update this operation should not be interrupted.
disableInterrupts();
// STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp
// We should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer
// timestamp
long now = EnvironmentEdgeManager.currentTime();
batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
// STEP 3. Build WAL edit
List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);
// STEP 4. Append the WALEdits to WAL and sync.
for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
walEdit = nonceKeyWALEditPair.getSecond();
@ -4611,6 +4720,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
releaseRowLocks(acquiredRowLocks);
enableInterrupts();
final int finalLastIndexExclusive =
miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size();
final boolean finalSuccess = success;
@ -6588,13 +6699,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
success = true;
return result;
} catch (InterruptedException ie) {
LOG.warn("Thread interrupted waiting for lock on row: {}, in region {}", rowKey,
getRegionInfo().getRegionNameAsString());
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
if (LOG.isDebugEnabled()) {
LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey,
getRegionInfo().getRegionNameAsString());
}
TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock");
Thread.currentThread().interrupt();
throw iie;
throw throwOnInterrupt(ie);
} catch (Error error) {
// The maximum lock count for read lock is 64K (hardcoded), when this maximum count
// is reached, it will throw out an Error. This Error needs to be caught so it can
@ -7286,6 +7396,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Scanning between column families and thus the scope is between cells
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
do {
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
checkInterrupt();
// We want to maintain any progress that is made towards the limits while scanning across
// different column families. To do this, we toggle the keep progress flag on during calls
// to the StoreScanner to ensure that any progress made thus far is not wiped away.
@ -7384,6 +7498,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
checkInterrupt();
// Let's see what we have in the storeHeap.
Cell current = this.storeHeap.peek();
@ -7464,6 +7582,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return true;
}
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
checkInterrupt();
Cell nextKv = this.storeHeap.peek();
shouldStop = shouldStop(nextKv);
// save that the row was empty before filters applied to it.
@ -7623,6 +7745,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Cell next;
while ((next = this.storeHeap.peek()) != null &&
CellUtil.matchingRows(next, curRowCell)) {
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
checkInterrupt();
this.storeHeap.next(MOCKED_LIST);
}
resetFilters();
@ -8288,6 +8413,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// when it assigns the edit a sequencedid (A.K.A the mvcc write number).
WriteEntry writeEntry = null;
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
checkInterrupt();
try {
boolean success = false;
try {
@ -8303,9 +8433,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
prevRowLock = rowLock;
}
}
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation. Do it before we take the lock and disable interrupts for
// the WAL append.
checkInterrupt();
// STEP 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
locked = true;
// From this point until memstore update this operation should not be interrupted.
disableInterrupts();
long now = EnvironmentEdgeManager.currentTime();
// STEP 4. Let the processor scan the rows, generate mutations and add waledits
doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
@ -8371,6 +8511,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// release locks if some were acquired but another timed out
releaseRowLocks(acquiredRowLocks);
enableInterrupts();
}
// 12. Run post-process hook
@ -8433,6 +8575,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowProcessorExecutor.execute(task);
try {
task.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
throw throwOnInterrupt(ie);
} catch (TimeoutException te) {
String row = processor.getRowsToLock().isEmpty() ? "" :
" on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
@ -8528,11 +8672,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return writeEntry;
}
/**
* @return Sorted list of <code>cells</code> using <code>comparator</code>
*/
@ -8558,7 +8697,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
(2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
(3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL,
// compactionsFailed
(2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints
(3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints, regionLockHolders
WriteState.HEAP_SIZE + // writestate
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
@ -8730,12 +8869,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void startRegionOperation(Operation op) throws IOException {
boolean isInterruptableOp = false;
switch (op) {
case GET: // read operations
case GET: // interruptible read operations
case SCAN:
isInterruptableOp = true;
checkReadsEnabled();
break;
default:
case INCREMENT: // interruptible write operations
case APPEND:
case PUT:
case DELETE:
case BATCH_MUTATE:
case CHECK_AND_MUTATE:
isInterruptableOp = true;
break;
default: // all others
break;
}
if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
@ -8748,6 +8897,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
}
lock(lock.readLock());
// Update regionLockHolders ONLY for any startRegionOperation call that is invoked from
// an RPC handler
Thread thisThread = Thread.currentThread();
if (isInterruptableOp) {
regionLockHolders.put(thisThread, true);
}
if (this.closed.get()) {
lock.readLock().unlock();
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
@ -8762,6 +8917,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postStartRegionOperation(op);
}
} catch (Exception e) {
if (isInterruptableOp) {
// would be harmless to remove what we didn't add but we know by 'isInterruptableOp'
// if we added this thread to regionLockHolders
regionLockHolders.remove(thisThread);
}
lock.readLock().unlock();
throw new IOException(e);
}
@ -8777,6 +8937,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (operation == Operation.SNAPSHOT) {
stores.values().forEach(HStore::postSnapshotOperation);
}
Thread thisThread = Thread.currentThread();
regionLockHolders.remove(thisThread);
lock.readLock().unlock();
if (coprocessorHost != null) {
coprocessorHost.postCloseRegionOperation(operation);
@ -8792,8 +8954,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws RegionTooBusyException if failed to get the lock in time
* @throws InterruptedIOException if interrupted while waiting for a lock
*/
private void startBulkRegionOperation(boolean writeLockNeeded)
throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
private void startBulkRegionOperation(boolean writeLockNeeded) throws IOException {
if (this.closing.get()) {
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
}
@ -8804,6 +8965,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
else lock.readLock().unlock();
throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
}
regionLockHolders.put(Thread.currentThread(), true);
}
/**
@ -8811,6 +8973,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* to the try block of #startRegionOperation
*/
private void closeBulkRegionOperation(){
regionLockHolders.remove(Thread.currentThread());
if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
else lock.readLock().unlock();
}
@ -8841,7 +9004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dataInMemoryWithoutWAL.add(mutationSize);
}
private void lock(final Lock lock) throws RegionTooBusyException, InterruptedIOException {
private void lock(final Lock lock) throws IOException {
lock(lock, 1);
}
@ -8850,8 +9013,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* if failed to get the lock in time. Throw InterruptedIOException
* if interrupted while waiting for the lock.
*/
private void lock(final Lock lock, final int multiplier)
throws RegionTooBusyException, InterruptedIOException {
private void lock(final Lock lock, final int multiplier) throws IOException {
try {
final long waitTime = Math.min(maxBusyWaitDuration,
busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
@ -8869,10 +9031,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw rtbe;
}
} catch (InterruptedException ie) {
LOG.info("Interrupted while waiting for a lock in region {}", this);
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted while waiting for a lock in region {}", this);
}
throw throwOnInterrupt(ie);
}
}
@ -9000,6 +9162,67 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return getReadPoint(IsolationLevel.READ_COMMITTED);
}
/**
* If a handler thread is eligible for interrupt, make it ineligible. Should be paired
* with {{@link #enableInterrupts()}.
*/
protected void disableInterrupts() {
regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> false);
}
/**
* If a handler thread was made ineligible for interrupt via {{@link #disableInterrupts()},
* make it eligible again. No-op if interrupts are already enabled.
*/
protected void enableInterrupts() {
regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> true);
}
/**
* Interrupt any region options that have acquired the region lock via
* {@link #startRegionOperation(org.apache.hadoop.hbase.regionserver.Region.Operation)},
* or {@link #startBulkRegionOperation(boolean)}.
*/
private void interruptRegionOperations() {
for (Map.Entry<Thread, Boolean> entry: regionLockHolders.entrySet()) {
// An entry in this map will have a boolean value indicating if it is currently
// eligible for interrupt; if so, we should interrupt it.
if (entry.getValue().booleanValue()) {
entry.getKey().interrupt();
}
}
}
/**
* Check thread interrupt status and throw an exception if interrupted.
* @throws NotServingRegionException if region is closing
* @throws InterruptedIOException if interrupted but region is not closing
*/
// Package scope for tests
void checkInterrupt() throws NotServingRegionException, InterruptedIOException {
if (Thread.interrupted()) {
if (this.closing.get()) {
throw new NotServingRegionException(
getRegionInfo().getRegionNameAsString() + " is closing");
}
throw new InterruptedIOException();
}
}
/**
* Throw the correct exception upon interrupt
* @param t cause
*/
// Package scope for tests
IOException throwOnInterrupt(Throwable t) {
if (this.closing.get()) {
return (NotServingRegionException) new NotServingRegionException(
getRegionInfo().getRegionNameAsString() + " is closing")
.initCause(t);
}
return (InterruptedIOException) new InterruptedIOException().initCause(t);
}
/**
* {@inheritDoc}
*/

View File

@ -199,7 +199,8 @@ public interface Region extends ConfigurationObserver {
*/
enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH,
CHECK_AND_MUTATE
}
/**

View File

@ -1979,14 +1979,15 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
/**
* Create an HRegion that writes to the local tmp dirs with specified wal
* @param info regioninfo
* @param conf configuration
* @param desc table descriptor
* @param wal wal for this region.
* @return created hregion
* @throws IOException
*/
public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc, WAL wal)
throws IOException {
return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
WAL wal) throws IOException {
return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
}
/**
@ -2000,14 +2001,15 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
* @throws IOException
*/
public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, isReadOnly,
Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
throws IOException {
return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
durability, wal, null, families);
}
public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
byte[] stopKey, boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore,
byte[]... families) throws IOException {
byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
boolean[] compactedMemStore, byte[]... families) throws IOException {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
builder.setReadOnly(isReadOnly);
int i = 0;
@ -2027,7 +2029,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
builder.setDurability(durability);
RegionInfo info =
RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
return createLocalHRegion(info, builder.build(), wal);
return createLocalHRegion(info, conf, builder.build(), wal);
}
//

View File

@ -181,7 +181,7 @@ public class TestCacheOnWriteInSchema {
RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
walFactory = new WALFactory(conf, id);
region = TEST_UTIL.createLocalHRegion(info, htd, walFactory.getWAL(info));
region = TEST_UTIL.createLocalHRegion(info, conf, htd, walFactory.getWAL(info));
region.setBlockCache(BlockCacheFactory.createBlockCache(conf));
store = new HStore(region, hcd, conf, false);
}

View File

@ -203,7 +203,7 @@ public class TestFailedAppendAndSync {
boolean threwOnAppend = false;
boolean threwOnBoth = false;
HRegion region = initHRegion(tableName, null, null, dodgyWAL);
HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
try {
// Get some random bytes.
byte[] value = Bytes.toBytes(getName());
@ -316,11 +316,11 @@ public class TestFailedAppendAndSync {
* @return A region on which you must call
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
Configuration conf, WAL wal) throws IOException {
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
wal, COLUMN_FAMILY_BYTES);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false,
Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
}
}

View File

@ -30,6 +30,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -136,6 +138,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
@ -354,7 +357,7 @@ public class TestHRegion {
Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
faultyLog.init();
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, faultyLog,
COLUMN_FAMILY_BYTES);
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
@ -401,8 +404,8 @@ public class TestHRegion {
Path rootDir = new Path(dir + testName);
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
hLog.init();
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
COLUMN_FAMILY_BYTES);
region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, hLog,
COLUMN_FAMILY_BYTES);
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
assertEquals(0, region.getMemStoreDataSize());
@ -500,7 +503,7 @@ public class TestHRegion {
HRegion region = null;
try {
// Initialize region
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, wal,
COLUMN_FAMILY_BYTES);
long size = region.getMemStoreDataSize();
Assert.assertEquals(0, size);
@ -565,7 +568,7 @@ public class TestHRegion {
HRegion region = null;
try {
// Initialize region
region = initHRegion(tableName, null, null, false,
region = initHRegion(tableName, null, null, CONF, false,
Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
long size = region.getMemStoreDataSize();
Assert.assertEquals(0, size);
@ -1055,7 +1058,7 @@ public class TestHRegion {
final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family);
try {
Path regiondir = region.getRegionFileSystem().getRegionDir();
FileSystem fs = region.getRegionFileSystem().getFileSystem();
@ -1260,7 +1263,7 @@ public class TestHRegion {
CommonFSUtils.getRootDir(walConf), method, walConf);
wal.init();
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family);
int i = 0;
Put put = new Put(Bytes.toBytes(i));
put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
@ -1291,7 +1294,7 @@ public class TestHRegion {
method, walConf);
wal.init();
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family);
region.put(put);
// 3. Test case where ABORT_FLUSH will throw exception.
// Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
@ -3240,7 +3243,7 @@ public class TestHRegion {
hLog.init();
// This chunk creation is done throughout the code base. Do we want to move it into core?
// It is missing from this test. W/o it we NPE.
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, hLog,
COLUMN_FAMILY_BYTES);
Cell originalCell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
@ -3513,7 +3516,7 @@ public class TestHRegion {
RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
this.region = TEST_UTIL.createLocalHRegion(info, tableDescriptor, wal);
this.region = TEST_UTIL.createLocalHRegion(info, CONF, tableDescriptor, wal);
// Put 4 version to memstore
long ts = 0;
@ -5405,7 +5408,7 @@ public class TestHRegion {
final WALFactory wals = new WALFactory(walConf, HBaseTestingUtility.getRandomUUID().toString());
final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, tableDurability, wal,
HConstants.EMPTY_END_ROW, CONF, false, tableDurability, wal,
new byte[][] { family });
Put put = new Put(Bytes.toBytes("r1"));
@ -5772,7 +5775,7 @@ public class TestHRegion {
RegionInfo hri =
RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
return initHRegion(tableName, startKey, stopKey, isReadOnly, Durability.SYNC_WAL, wal,
return initHRegion(tableName, startKey, stopKey, conf, isReadOnly, Durability.SYNC_WAL, wal,
families);
}
@ -5781,11 +5784,12 @@ public class TestHRegion {
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
*/
public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
byte[]... families) throws IOException {
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
isReadOnly, durability, wal, families);
conf, isReadOnly, durability, wal, families);
}
/**
@ -6052,9 +6056,9 @@ public class TestHRegion {
byte[] col1 = Bytes.toBytes("col1");
byte[] col2 = Bytes.toBytes("col2");
long ts = 1;
HBaseConfiguration config = new HBaseConfiguration();
config.setInt("test.block.size", 1);
this.region = initHRegion(tableName, method, config, families);
Configuration conf = new Configuration(CONF);
conf.setInt("test.block.size", 1);
this.region = initHRegion(tableName, method, conf, families);
KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
@ -6132,7 +6136,7 @@ public class TestHRegion {
byte[][] families = { cf1, cf2, cf3 };
byte[] col = Bytes.toBytes("C");
long ts = 1;
HBaseConfiguration conf = new HBaseConfiguration();
Configuration conf = new Configuration(CONF);
// disable compactions in this test.
conf.setInt("hbase.hstore.compactionThreshold", 10000);
this.region = initHRegion(tableName, method, conf, families);
@ -6294,7 +6298,7 @@ public class TestHRegion {
byte[][] families = { cf1, cf2, cf3, cf4 };
byte[] col = Bytes.toBytes("C");
long ts = 1;
HBaseConfiguration conf = new HBaseConfiguration();
Configuration conf = new Configuration(CONF);
// disable compactions in this test.
conf.setInt("hbase.hstore.compactionThreshold", 10000);
this.region = initHRegion(tableName, method, conf, families);
@ -6360,7 +6364,7 @@ public class TestHRegion {
byte[] cf1 = Bytes.toBytes("CF1");
byte[][] families = {cf1};
byte[] col = Bytes.toBytes("C");
HBaseConfiguration conf = new HBaseConfiguration();
Configuration conf = new Configuration(CONF);
this.region = initHRegion(tableName, method, conf, families);
// setup with one storefile and one memstore, to create scanner and get an earlier readPt
Put put = new Put(Bytes.toBytes("19998"));
@ -6409,8 +6413,7 @@ public class TestHRegion {
byte[] cf1 = Bytes.toBytes("CF1");
byte[][] families = { cf1 };
byte[] col = Bytes.toBytes("C");
HBaseConfiguration conf = new HBaseConfiguration();
this.region = initHRegion(tableName, method, conf, families);
this.region = initHRegion(tableName, method, CONF, families);
// setup with one storefile and one memstore, to create scanner and get an earlier readPt
Put put = new Put(Bytes.toBytes("19996"));
put.addColumn(cf1, col, Bytes.toBytes("val"));
@ -6462,8 +6465,7 @@ public class TestHRegion {
byte[][] families = { cf1 };
byte[] col = Bytes.toBytes("C");
HBaseConfiguration conf = new HBaseConfiguration();
this.region = initHRegion(tableName, method, conf, families);
this.region = initHRegion(tableName, method, CONF, families);
Put put = new Put(Bytes.toBytes("199996"));
put.addColumn(cf1, col, Bytes.toBytes("val"));
@ -7364,4 +7366,226 @@ public class TestHRegion {
return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
}
}
@Test
public void testCloseNoInterrupt() throws Exception {
byte[] cf1 = Bytes.toBytes("CF1");
byte[][] families = { cf1 };
final int SLEEP_TIME = 10 * 1000;
Configuration conf = new Configuration(CONF);
// Disable close thread interrupt and server abort behavior
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, false);
conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000);
region = initHRegion(tableName, method, conf, families);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean holderInterrupted = new AtomicBoolean();
Thread holder = new Thread(new Runnable() {
@Override
public void run() {
try {
LOG.info("Starting region operation holder");
region.startRegionOperation(Operation.SCAN);
latch.countDown();
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
LOG.info("Interrupted");
holderInterrupted.set(true);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
region.closeRegionOperation();
} catch (IOException e) {
}
LOG.info("Stopped region operation holder");
}
}
});
holder.start();
latch.await();
region.close();
region = null;
holder.join();
assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get());
}
@Test
public void testCloseInterrupt() throws Exception {
byte[] cf1 = Bytes.toBytes("CF1");
byte[][] families = { cf1 };
final int SLEEP_TIME = 10 * 1000;
Configuration conf = new Configuration(CONF);
// Enable close thread interrupt and server abort behavior
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
// Speed up the unit test, no need to wait default 10 seconds.
conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000);
region = initHRegion(tableName, method, conf, families);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean holderInterrupted = new AtomicBoolean();
Thread holder = new Thread(new Runnable() {
@Override
public void run() {
try {
LOG.info("Starting region operation holder");
region.startRegionOperation(Operation.SCAN);
latch.countDown();
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
LOG.info("Interrupted");
holderInterrupted.set(true);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
region.closeRegionOperation();
} catch (IOException e) {
}
LOG.info("Stopped region operation holder");
}
}
});
holder.start();
latch.await();
region.close();
region = null;
holder.join();
assertTrue("Region lock holder was not interrupted", holderInterrupted.get());
}
@Test
public void testCloseAbort() throws Exception {
byte[] cf1 = Bytes.toBytes("CF1");
byte[][] families = { cf1 };
final int SLEEP_TIME = 10 * 1000;
Configuration conf = new Configuration(CONF);
// Enable close thread interrupt and server abort behavior.
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
// Set the abort interval to a fraction of sleep time so we are guaranteed to be aborted.
conf.setInt(HRegion.CLOSE_WAIT_TIME, SLEEP_TIME / 2);
// Set the wait interval to a fraction of sleep time so we are guaranteed to be interrupted.
conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, SLEEP_TIME / 4);
region = initHRegion(tableName, method, conf, families);
RegionServerServices rsServices = mock(RegionServerServices.class);
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost", 1000, 1000));
region.rsServices = rsServices;
final CountDownLatch latch = new CountDownLatch(1);
Thread holder = new Thread(new Runnable() {
@Override
public void run() {
try {
LOG.info("Starting region operation holder");
region.startRegionOperation(Operation.SCAN);
latch.countDown();
// Hold the lock for SLEEP_TIME seconds no matter how many times we are interrupted.
int timeRemaining = SLEEP_TIME;
while (timeRemaining > 0) {
long start = EnvironmentEdgeManager.currentTime();
try {
Thread.sleep(timeRemaining);
} catch (InterruptedException e) {
LOG.info("Interrupted");
}
long end = EnvironmentEdgeManager.currentTime();
timeRemaining -= end - start;
if (timeRemaining < 0) {
timeRemaining = 0;
}
if (timeRemaining > 0) {
LOG.info("Sleeping again, remaining time " + timeRemaining + " ms");
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
region.closeRegionOperation();
} catch (IOException e) {
}
LOG.info("Stopped region operation holder");
}
}
});
holder.start();
latch.await();
try {
region.close();
} catch (IOException e) {
LOG.info("Caught expected exception", e);
}
region = null;
holder.join();
// Verify the region tried to abort the server
verify(rsServices, atLeast(1)).abort(anyString(),any());
}
@Test
public void testInterruptProtection() throws Exception {
byte[] cf1 = Bytes.toBytes("CF1");
byte[][] families = { cf1 };
final int SLEEP_TIME = 10 * 1000;
Configuration conf = new Configuration(CONF);
// Enable close thread interrupt and server abort behavior.
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000);
region = initHRegion(tableName, method, conf, families);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean holderInterrupted = new AtomicBoolean();
Thread holder = new Thread(new Runnable() {
@Override
public void run() {
try {
LOG.info("Starting region operation holder");
region.startRegionOperation(Operation.SCAN);
LOG.info("Protecting against interrupts");
region.disableInterrupts();
try {
latch.countDown();
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
LOG.info("Interrupted");
holderInterrupted.set(true);
}
} finally {
region.enableInterrupts();
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
region.closeRegionOperation();
} catch (IOException e) {
}
LOG.info("Stopped region operation holder");
}
}
});
holder.start();
latch.await();
region.close();
region = null;
holder.join();
assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get());
}
}

View File

@ -1709,6 +1709,6 @@ public class TestHRegionReplayEvents {
private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException {
return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, false, Durability.SYNC_WAL, null, families);
HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families);
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
@ -51,7 +53,8 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion {
*/
@Override
public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
throws IOException {
boolean[] inMemory = new boolean[families.length];
for(int i = 0; i < inMemory.length; i++) {
inMemory[i] = true;
@ -59,7 +62,7 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion {
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey,
isReadOnly, durability, wal, inMemory, families);
conf, isReadOnly, durability, wal, inMemory, families);
}
@Override int getTestCountForTestWritesWhileScanning() {

View File

@ -87,7 +87,7 @@ public class TestRegionIncrement {
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
HConstants.EMPTY_BYTE_ARRAY, conf, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
}
private void closeRegion(final HRegion region) throws IOException {

View File

@ -0,0 +1,363 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({RegionServerTests.class, LargeTests.class})
public class TestRegionInterrupt {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionInterrupt.class);
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class);
static final byte[] FAMILY = Bytes.toBytes("info");
static long sleepTime;
@Rule
public TableNameTestRule name = new TableNameTestRule();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class);
conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
// Ensure the sleep interval is long enough for interrupts to occur.
long waitInterval = conf.getLong(HRegion.CLOSE_WAIT_INTERVAL,
HRegion.DEFAULT_CLOSE_WAIT_INTERVAL);
sleepTime = waitInterval * 2;
// Try to bound the running time of this unit if expected actions do not take place.
conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2);
}
@Before
public void setUp() throws Exception {
TEST_UTIL.startMiniCluster();
}
@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testCloseInterruptScanning() throws Exception {
final TableName tableName = name.getTableName();
LOG.info("Creating table " + tableName);
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
// load some data
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
TEST_UTIL.loadTable(table, FAMILY);
final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
// scan the table in the background
Thread scanner = new Thread(new Runnable() {
@Override
public void run() {
Scan scan = new Scan();
scan.addFamily(FAMILY);
scan.setFilter(new DelayingFilter());
try {
LOG.info("Starting scan");
try (ResultScanner rs = table.getScanner(scan)) {
Result r;
do {
r = rs.next();
if (r != null) {
LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow()));
}
} while (r != null);
}
} catch (IOException e) {
LOG.info("Scanner caught exception", e);
expectedExceptionCaught.set(true);
} finally {
LOG.info("Finished scan");
}
}
});
scanner.start();
// Wait for the filter to begin sleeping
LOG.info("Waiting for scanner to start");
Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return DelayingFilter.isSleeping();
}
});
// Offline the table, this will trigger closing
LOG.info("Offlining table " + tableName);
TEST_UTIL.getAdmin().disableTable(tableName);
// Wait for scanner termination
scanner.join();
// When we get here the region has closed and the table is offline
assertTrue("Region operations were not interrupted",
InterruptInterceptingHRegion.wasInterrupted());
assertTrue("Scanner did not catch expected exception", expectedExceptionCaught.get());
}
}
@Test
public void testCloseInterruptMutation() throws Exception {
final TableName tableName = name.getTableName();
final Admin admin = TEST_UTIL.getAdmin();
// Create the test table
TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
.setCoprocessor(MutationDelayingCoprocessor.class.getName())
.build();
LOG.info("Creating table " + tableName);
admin.createTable(htd);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
// Insert some data in the background
LOG.info("Starting writes to table " + tableName);
final int NUM_ROWS = 100;
final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
Thread inserter = new Thread(new Runnable() {
@Override
public void run() {
try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) {
for (int i = 0; i < NUM_ROWS; i++) {
LOG.info("Writing row " + i + " to " + tableName);
byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i));
Bytes.random(value);
t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value));
t.flush();
}
} catch (IOException e) {
LOG.info("Inserter caught exception", e);
expectedExceptionCaught.set(true);
}
}
});
inserter.start();
// Wait for delayed insertion to begin
LOG.info("Waiting for mutations to start");
Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return MutationDelayingCoprocessor.isSleeping();
}
});
// Offline the table, this will trigger closing
LOG.info("Offlining table " + tableName);
admin.disableTable(tableName);
// Wait for the inserter to finish
inserter.join();
// When we get here the region has closed and the table is offline
assertTrue("Region operations were not interrupted",
InterruptInterceptingHRegion.wasInterrupted());
assertTrue("Inserter did not catch expected exception", expectedExceptionCaught.get());
}
public static class InterruptInterceptingHRegion extends HRegion {
private static boolean interrupted = false;
public static boolean wasInterrupted() {
return interrupted;
}
public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs,
Configuration conf, RegionInfo regionInfo, TableDescriptor htd,
RegionServerServices rsServices) {
super(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
}
public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf,
TableDescriptor htd, RegionServerServices rsServices) {
super(fs, wal, conf, htd, rsServices);
}
@Override
void checkInterrupt() throws NotServingRegionException, InterruptedIOException {
try {
super.checkInterrupt();
} catch (NotServingRegionException | InterruptedIOException e) {
interrupted = true;
throw e;
}
}
@Override
IOException throwOnInterrupt(Throwable t) {
interrupted = true;
return super.throwOnInterrupt(t);
}
}
public static class DelayingFilter extends FilterBase {
static volatile boolean sleeping = false;
public static boolean isSleeping() {
return sleeping;
}
@Override
public ReturnCode filterCell(Cell v) throws IOException {
LOG.info("Starting sleep on " + v);
sleeping = true;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// restore interrupt status so region scanner can handle it as expected
Thread.currentThread().interrupt();
LOG.info("Interrupted during sleep on " + v);
} finally {
LOG.info("Done sleep on " + v);
sleeping = false;
}
return ReturnCode.INCLUDE;
}
public static DelayingFilter parseFrom(final byte [] pbBytes)
throws DeserializationException {
// Just return a new instance.
return new DelayingFilter();
}
}
public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver {
static volatile boolean sleeping = false;
public static boolean isSleeping() {
return sleeping;
}
private void doSleep(Region.Operation op) {
LOG.info("Starting sleep for " + op);
sleeping = true;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// restore interrupt status so doMiniBatchMutation etc. can handle it as expected
Thread.currentThread().interrupt();
LOG.info("Interrupted during " + op);
} finally {
LOG.info("Done");
sleeping = false;
}
}
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
Durability durability) throws IOException {
doSleep(Region.Operation.PUT);
RegionObserver.super.prePut(c, put, edit, durability);
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
WALEdit edit, Durability durability) throws IOException {
doSleep(Region.Operation.DELETE);
RegionObserver.super.preDelete(c, delete, edit, durability);
}
@Override
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
throws IOException {
doSleep(Region.Operation.APPEND);
return RegionObserver.super.preAppend(c, append);
}
@Override
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
throws IOException {
doSleep(Region.Operation.INCREMENT);
return RegionObserver.super.preIncrement(c, increment);
}
}
}

View File

@ -226,7 +226,7 @@ public class TestWALLockup {
// There is no 'stop' once a logRoller is running.. it just dies.
logRoller.start();
// Now get a region and start adding in edits.
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
final HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL);
byte [] bytes = Bytes.toBytes(getName());
NavigableMap<byte[], Integer> scopes = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
@ -557,11 +557,11 @@ public class TestWALLockup {
* @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
* when done.
*/
private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
Configuration conf, WAL wal) throws IOException {
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
wal, COLUMN_FAMILY_BYTES);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false,
Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
}
}

View File

@ -563,7 +563,7 @@ public abstract class AbstractTestFSWAL {
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
TEST_UTIL.createLocalHRegion(hri, CONF, htd, wal).close();
RegionServerServices rsServices = mock(RegionServerServices.class);
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
when(rsServices.getConfiguration()).thenReturn(conf);

View File

@ -168,7 +168,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log);
ExecutorService exec = Executors.newFixedThreadPool(2);
// do a regular write first because of memstore size calculation.

View File

@ -89,7 +89,7 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
FileSystem fs = FileSystem.get(conf);
Path rootDir = new Path(dir + getName());
T wal = getWAL(fs, rootDir, getName(), conf);
HRegion region = initHRegion(tableName, null, null, wal);
HRegion region = initHRegion(tableName, null, null, conf, wal);
try {
resetSyncFlag(wal);
assertNull(getSyncFlag(wal));
@ -114,7 +114,7 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
fs = FileSystem.get(conf);
wal = getWAL(fs, rootDir, getName(), conf);
region = initHRegion(tableName, null, null, wal);
region = initHRegion(tableName, null, null, conf, wal);
try {
resetSyncFlag(wal);
@ -156,11 +156,11 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
* @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
* when done.
*/
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
throws IOException {
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
Configuration conf, WAL wal) throws IOException {
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.USE_DEFAULT,
wal, COLUMN_FAMILY_BYTES);
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false,
Durability.USE_DEFAULT, wal, COLUMN_FAMILY_BYTES);
}
}