diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 57a1e1f5de9..bca18dbcb01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -688,7 +688,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Last flush time for each Store. Useful when we are flushing for each column private final ConcurrentMap lastStoreFlushTimeMap = new ConcurrentHashMap<>(); - final RegionServerServices rsServices; + protected RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private long flushCheckInterval; // flushPerChanges is to prevent too many changes in memstore @@ -696,6 +696,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private long blockingMemStoreSize; // Used to guard closes final ReentrantReadWriteLock lock; + // Used to track interruptible holders of the region lock. Currently that is only RPC handler + // threads. Boolean value in map determines if lock holder can be interrupted, normally true, + // but may be false when thread is transiting a critical section. + final ConcurrentHashMap regionLockHolders; // Stop updates lock private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock(); @@ -788,6 +792,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR; this.lock = new ReentrantReadWriteLock(conf.getBoolean(FAIR_REENTRANT_CLOSE_LOCK, DEFAULT_FAIR_REENTRANT_CLOSE_LOCK)); + this.regionLockHolders = new ConcurrentHashMap<>(); this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, DEFAULT_CACHE_FLUSH_INTERVAL); this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES); @@ -1174,7 +1179,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.info("Setting FlushNonSloppyStoresFirstPolicy for the region=" + this); } } catch (InterruptedException e) { - throw (InterruptedIOException)new InterruptedIOException().initCause(e); + throw throwOnInterrupt(e); } catch (ExecutionException e) { throw new IOException(e.getCause()); } finally { @@ -1578,6 +1583,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G + public static final String CLOSE_WAIT_ABORT = "hbase.regionserver.close.wait.abort"; + public static final boolean DEFAULT_CLOSE_WAIT_ABORT = true; + public static final String CLOSE_WAIT_TIME = "hbase.regionserver.close.wait.time.ms"; + public static final long DEFAULT_CLOSE_WAIT_TIME = 60000; // 1 minute + public static final String CLOSE_WAIT_INTERVAL = "hbase.regionserver.close.wait.interval.ms"; + public static final long DEFAULT_CLOSE_WAIT_INTERVAL = 10000; // 10 seconds + public Map> close(boolean abort) throws IOException { return close(abort, false); } @@ -1679,22 +1691,103 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - if (timeoutForWriteLock == null - || timeoutForWriteLock == Long.MAX_VALUE) { - // block waiting for the lock for closing - lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine - } else { - try { - boolean succeed = lock.writeLock().tryLock(timeoutForWriteLock, TimeUnit.SECONDS); - if (!succeed) { - throw new IOException("Failed to get write lock when closing region"); - } - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } - } + // Set the closing flag + // From this point new arrivals at the region lock will get NSRE. + this.closing.set(true); LOG.info("Closing region {}", this); + + // Acquire the close lock + + // The configuration parameter CLOSE_WAIT_ABORT is overloaded to enable both + // the new regionserver abort condition and interrupts for running requests. + // If CLOSE_WAIT_ABORT is not enabled there is no change from earlier behavior, + // we will not attempt to interrupt threads servicing requests nor crash out + // the regionserver if something remains stubborn. + + final boolean canAbort = conf.getBoolean(CLOSE_WAIT_ABORT, DEFAULT_CLOSE_WAIT_ABORT); + boolean useTimedWait = false; + if (timeoutForWriteLock != null && timeoutForWriteLock != Long.MAX_VALUE) { + // convert legacy use of timeoutForWriteLock in seconds to new use in millis + timeoutForWriteLock = TimeUnit.SECONDS.toMillis(timeoutForWriteLock); + useTimedWait = true; + } else if (canAbort) { + timeoutForWriteLock = conf.getLong(CLOSE_WAIT_TIME, DEFAULT_CLOSE_WAIT_TIME); + useTimedWait = true; + } + if (LOG.isDebugEnabled()) { + LOG.debug((useTimedWait ? "Time limited wait" : "Waiting without time limit") + + " for close lock on " + this); + } + final long closeWaitInterval = conf.getLong(CLOSE_WAIT_INTERVAL, DEFAULT_CLOSE_WAIT_INTERVAL); + long elapsedWaitTime = 0; + if (useTimedWait) { + // Sanity check configuration + long remainingWaitTime = timeoutForWriteLock; + if (remainingWaitTime < closeWaitInterval) { + LOG.warn("Time limit for close wait of " + timeoutForWriteLock + + " ms is less than the configured lock acquisition wait interval " + + closeWaitInterval + " ms, using wait interval as time limit"); + remainingWaitTime = closeWaitInterval; + } + boolean acquired = false; + do { + long start = EnvironmentEdgeManager.currentTime(); + try { + acquired = lock.writeLock().tryLock(Math.min(remainingWaitTime, closeWaitInterval), + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Interrupted waiting for close lock. More likely the server is shutting down, not + // normal operation, so aborting upon interrupt while waiting on this lock would not + // provide much value. Throw an IOE (as IIOE) like we would in the case where we + // fail to acquire the lock. + String msg = "Interrupted while waiting for close lock on " + this; + LOG.warn(msg, e); + throw (InterruptedIOException) new InterruptedIOException(msg).initCause(e); + } + long elapsed = EnvironmentEdgeManager.currentTime() - start; + elapsedWaitTime += elapsed; + remainingWaitTime -= elapsed; + if (canAbort && !acquired && remainingWaitTime > 0) { + // Before we loop to wait again, interrupt all region operations that might + // still be in progress, to encourage them to break out of waiting states or + // inner loops, throw an exception to clients, and release the read lock via + // endRegionOperation. + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupting region operations after waiting for close lock for " + + elapsedWaitTime + " ms on " + this + ", " + remainingWaitTime + + " ms remaining"); + } + interruptRegionOperations(); + } + } while (!acquired && remainingWaitTime > 0); + + // If we fail to acquire the lock, trigger an abort if we can; otherwise throw an IOE + // to let the caller know we could not proceed with the close. + if (!acquired) { + String msg = "Failed to acquire close lock on " + this + " after waiting " + + elapsedWaitTime + " ms"; + LOG.error(msg); + if (canAbort) { + // If we failed to acquire the write lock, abort the server + rsServices.abort(msg, null); + } + throw new IOException(msg); + } + + } else { + + long start = EnvironmentEdgeManager.currentTime(); + lock.writeLock().lock(); + elapsedWaitTime = EnvironmentEdgeManager.currentTime() - start; + + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Acquired close lock on " + this + " after waiting " + + elapsedWaitTime + " ms"); + } + status.setStatus("Disabling writes for close"); try { if (this.isClosed()) { @@ -1782,7 +1875,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi familyFiles.addAll(storeFiles.getSecond()); } } catch (InterruptedException e) { - throw (InterruptedIOException)new InterruptedIOException().initCause(e); + throw throwOnInterrupt(e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof IOException) { @@ -4549,6 +4642,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MiniBatchOperationInProgress miniBatchOp = null; /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size()); + + // Check for thread interrupt status in case we have been signaled from + // #interruptRegionOperation. + checkInterrupt(); + try { // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with // locked rows @@ -4562,20 +4660,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } + // Check for thread interrupt status in case we have been signaled from + // #interruptRegionOperation. Do it before we take the lock and disable interrupts for + // the WAL append. + checkInterrupt(); + lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount()); locked = true; + // From this point until memstore update this operation should not be interrupted. + disableInterrupts(); + // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer // timestamp + long now = EnvironmentEdgeManager.currentTime(); batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); // STEP 3. Build WAL edit + List> walEdits = batchOp.buildWALEdits(miniBatchOp); // STEP 4. Append the WALEdits to WAL and sync. + for(Iterator> it = walEdits.iterator(); it.hasNext();) { Pair nonceKeyWALEditPair = it.next(); walEdit = nonceKeyWALEditPair.getSecond(); @@ -4611,6 +4720,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } releaseRowLocks(acquiredRowLocks); + enableInterrupts(); + final int finalLastIndexExclusive = miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size(); final boolean finalSuccess = success; @@ -6588,13 +6699,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi success = true; return result; } catch (InterruptedException ie) { - LOG.warn("Thread interrupted waiting for lock on row: {}, in region {}", rowKey, - getRegionInfo().getRegionNameAsString()); - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); + if (LOG.isDebugEnabled()) { + LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey, + getRegionInfo().getRegionNameAsString()); + } TraceUtil.addTimelineAnnotation("Interrupted exception getting row lock"); - Thread.currentThread().interrupt(); - throw iie; + throw throwOnInterrupt(ie); } catch (Error error) { // The maximum lock count for read lock is 64K (hardcoded), when this maximum count // is reached, it will throw out an Error. This Error needs to be caught so it can @@ -7286,6 +7396,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; do { + // Check for thread interrupt status in case we have been signaled from + // #interruptRegionOperation. + checkInterrupt(); + // We want to maintain any progress that is made towards the limits while scanning across // different column families. To do this, we toggle the keep progress flag on during calls // to the StoreScanner to ensure that any progress made thus far is not wiped away. @@ -7384,6 +7498,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + // Check for thread interrupt status in case we have been signaled from + // #interruptRegionOperation. + checkInterrupt(); + // Let's see what we have in the storeHeap. Cell current = this.storeHeap.peek(); @@ -7464,6 +7582,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; } + // Check for thread interrupt status in case we have been signaled from + // #interruptRegionOperation. + checkInterrupt(); + Cell nextKv = this.storeHeap.peek(); shouldStop = shouldStop(nextKv); // save that the row was empty before filters applied to it. @@ -7623,6 +7745,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Cell next; while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { + // Check for thread interrupt status in case we have been signaled from + // #interruptRegionOperation. + checkInterrupt(); this.storeHeap.next(MOCKED_LIST); } resetFilters(); @@ -8288,6 +8413,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // when it assigns the edit a sequencedid (A.K.A the mvcc write number). WriteEntry writeEntry = null; MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing(); + + // Check for thread interrupt status in case we have been signaled from + // #interruptRegionOperation. + checkInterrupt(); + try { boolean success = false; try { @@ -8303,9 +8433,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi prevRowLock = rowLock; } } + + // Check for thread interrupt status in case we have been signaled from + // #interruptRegionOperation. Do it before we take the lock and disable interrupts for + // the WAL append. + checkInterrupt(); + // STEP 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size()); locked = true; + + // From this point until memstore update this operation should not be interrupted. + disableInterrupts(); + long now = EnvironmentEdgeManager.currentTime(); // STEP 4. Let the processor scan the rows, generate mutations and add waledits doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); @@ -8371,6 +8511,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // release locks if some were acquired but another timed out releaseRowLocks(acquiredRowLocks); + + enableInterrupts(); } // 12. Run post-process hook @@ -8433,6 +8575,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rowProcessorExecutor.execute(task); try { task.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + throw throwOnInterrupt(ie); } catch (TimeoutException te) { String row = processor.getRowsToLock().isEmpty() ? "" : " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "..."; @@ -8528,11 +8672,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return writeEntry; } - - - - - /** * @return Sorted list of cells using comparator */ @@ -8558,7 +8697,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing (3 * ClassSize.ATOMIC_LONG) + // numPutsWithoutWAL, dataInMemoryWithoutWAL, // compactionsFailed - (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints + (3 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints, regionLockHolders WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock @@ -8730,12 +8869,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void startRegionOperation(Operation op) throws IOException { + boolean isInterruptableOp = false; switch (op) { - case GET: // read operations + case GET: // interruptible read operations case SCAN: + isInterruptableOp = true; checkReadsEnabled(); break; - default: + case INCREMENT: // interruptible write operations + case APPEND: + case PUT: + case DELETE: + case BATCH_MUTATE: + case CHECK_AND_MUTATE: + isInterruptableOp = true; + break; + default: // all others break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION @@ -8748,6 +8897,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing"); } lock(lock.readLock()); + // Update regionLockHolders ONLY for any startRegionOperation call that is invoked from + // an RPC handler + Thread thisThread = Thread.currentThread(); + if (isInterruptableOp) { + regionLockHolders.put(thisThread, true); + } if (this.closed.get()) { lock.readLock().unlock(); throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed"); @@ -8762,6 +8917,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi coprocessorHost.postStartRegionOperation(op); } } catch (Exception e) { + if (isInterruptableOp) { + // would be harmless to remove what we didn't add but we know by 'isInterruptableOp' + // if we added this thread to regionLockHolders + regionLockHolders.remove(thisThread); + } lock.readLock().unlock(); throw new IOException(e); } @@ -8777,6 +8937,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (operation == Operation.SNAPSHOT) { stores.values().forEach(HStore::postSnapshotOperation); } + Thread thisThread = Thread.currentThread(); + regionLockHolders.remove(thisThread); lock.readLock().unlock(); if (coprocessorHost != null) { coprocessorHost.postCloseRegionOperation(operation); @@ -8792,8 +8954,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws RegionTooBusyException if failed to get the lock in time * @throws InterruptedIOException if interrupted while waiting for a lock */ - private void startBulkRegionOperation(boolean writeLockNeeded) - throws NotServingRegionException, RegionTooBusyException, InterruptedIOException { + private void startBulkRegionOperation(boolean writeLockNeeded) throws IOException { if (this.closing.get()) { throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing"); } @@ -8804,6 +8965,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi else lock.readLock().unlock(); throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed"); } + regionLockHolders.put(Thread.currentThread(), true); } /** @@ -8811,6 +8973,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * to the try block of #startRegionOperation */ private void closeBulkRegionOperation(){ + regionLockHolders.remove(Thread.currentThread()); if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock(); else lock.readLock().unlock(); } @@ -8841,7 +9004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi dataInMemoryWithoutWAL.add(mutationSize); } - private void lock(final Lock lock) throws RegionTooBusyException, InterruptedIOException { + private void lock(final Lock lock) throws IOException { lock(lock, 1); } @@ -8850,8 +9013,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * if failed to get the lock in time. Throw InterruptedIOException * if interrupted while waiting for the lock. */ - private void lock(final Lock lock, final int multiplier) - throws RegionTooBusyException, InterruptedIOException { + private void lock(final Lock lock, final int multiplier) throws IOException { try { final long waitTime = Math.min(maxBusyWaitDuration, busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier)); @@ -8869,10 +9031,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw rtbe; } } catch (InterruptedException ie) { - LOG.info("Interrupted while waiting for a lock in region {}", this); - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while waiting for a lock in region {}", this); + } + throw throwOnInterrupt(ie); } } @@ -9000,6 +9162,67 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return getReadPoint(IsolationLevel.READ_COMMITTED); } + /** + * If a handler thread is eligible for interrupt, make it ineligible. Should be paired + * with {{@link #enableInterrupts()}. + */ + protected void disableInterrupts() { + regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> false); + } + + /** + * If a handler thread was made ineligible for interrupt via {{@link #disableInterrupts()}, + * make it eligible again. No-op if interrupts are already enabled. + */ + protected void enableInterrupts() { + regionLockHolders.computeIfPresent(Thread.currentThread(), (t,b) -> true); + } + + /** + * Interrupt any region options that have acquired the region lock via + * {@link #startRegionOperation(org.apache.hadoop.hbase.regionserver.Region.Operation)}, + * or {@link #startBulkRegionOperation(boolean)}. + */ + private void interruptRegionOperations() { + for (Map.Entry entry: regionLockHolders.entrySet()) { + // An entry in this map will have a boolean value indicating if it is currently + // eligible for interrupt; if so, we should interrupt it. + if (entry.getValue().booleanValue()) { + entry.getKey().interrupt(); + } + } + } + + /** + * Check thread interrupt status and throw an exception if interrupted. + * @throws NotServingRegionException if region is closing + * @throws InterruptedIOException if interrupted but region is not closing + */ + // Package scope for tests + void checkInterrupt() throws NotServingRegionException, InterruptedIOException { + if (Thread.interrupted()) { + if (this.closing.get()) { + throw new NotServingRegionException( + getRegionInfo().getRegionNameAsString() + " is closing"); + } + throw new InterruptedIOException(); + } + } + + /** + * Throw the correct exception upon interrupt + * @param t cause + */ + // Package scope for tests + IOException throwOnInterrupt(Throwable t) { + if (this.closing.get()) { + return (NotServingRegionException) new NotServingRegionException( + getRegionInfo().getRegionNameAsString() + " is closing") + .initCause(t); + } + return (InterruptedIOException) new InterruptedIOException().initCause(t); + } + /** * {@inheritDoc} */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 79df0013e08..900e5711415 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -199,7 +199,8 @@ public interface Region extends ConfigurationObserver { */ enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, - REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH + REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH, + CHECK_AND_MUTATE } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index cb2e9e92893..29e88837290 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1979,14 +1979,15 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { /** * Create an HRegion that writes to the local tmp dirs with specified wal * @param info regioninfo + * @param conf configuration * @param desc table descriptor * @param wal wal for this region. * @return created hregion * @throws IOException */ - public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc, WAL wal) - throws IOException { - return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal); + public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, + WAL wal) throws IOException { + return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); } /** @@ -2000,14 +2001,15 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { * @throws IOException */ public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, - boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException { - return createLocalHRegionWithInMemoryFlags(tableName,startKey, stopKey, isReadOnly, + Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) + throws IOException { + return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, durability, wal, null, families); } public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, - byte[] stopKey, boolean isReadOnly, Durability durability, WAL wal, boolean[] compactedMemStore, - byte[]... families) throws IOException { + byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, + boolean[] compactedMemStore, byte[]... families) throws IOException { TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); builder.setReadOnly(isReadOnly); int i = 0; @@ -2027,7 +2029,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { builder.setDurability(durability); RegionInfo info = RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); - return createLocalHRegion(info, builder.build(), wal); + return createLocalHRegion(info, conf, builder.build(), wal); } // diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 3c915537119..60ca5b3896b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -181,7 +181,7 @@ public class TestCacheOnWriteInSchema { RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); walFactory = new WALFactory(conf, id); - region = TEST_UTIL.createLocalHRegion(info, htd, walFactory.getWAL(info)); + region = TEST_UTIL.createLocalHRegion(info, conf, htd, walFactory.getWAL(info)); region.setBlockCache(BlockCacheFactory.createBlockCache(conf)); store = new HStore(region, hcd, conf, false); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index fdf96dab87f..dab82144f04 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -203,7 +203,7 @@ public class TestFailedAppendAndSync { boolean threwOnAppend = false; boolean threwOnBoth = false; - HRegion region = initHRegion(tableName, null, null, dodgyWAL); + HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL); try { // Get some random bytes. byte[] value = Bytes.toBytes(getName()); @@ -316,11 +316,11 @@ public class TestFailedAppendAndSync { * @return A region on which you must call * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. */ - public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) - throws IOException { + public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, + Configuration conf, WAL wal) throws IOException { ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); - return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL, - wal, COLUMN_FAMILY_BYTES); + return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false, + Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index bbc73e3bda5..da3f2204ddd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -30,6 +30,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -136,6 +138,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; @@ -354,7 +357,7 @@ public class TestHRegion { Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); faultyLog.init(); - region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog, + region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); HStore store = region.getStore(COLUMN_FAMILY_BYTES); @@ -401,8 +404,8 @@ public class TestHRegion { Path rootDir = new Path(dir + testName); FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF); hLog.init(); - region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, - COLUMN_FAMILY_BYTES); + region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, hLog, + COLUMN_FAMILY_BYTES); HStore store = region.getStore(COLUMN_FAMILY_BYTES); assertEquals(0, region.getMemStoreDataSize()); @@ -500,7 +503,7 @@ public class TestHRegion { HRegion region = null; try { // Initialize region - region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal, + region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); long size = region.getMemStoreDataSize(); Assert.assertEquals(0, size); @@ -565,7 +568,7 @@ public class TestHRegion { HRegion region = null; try { // Initialize region - region = initHRegion(tableName, null, null, false, + region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); long size = region.getMemStoreDataSize(); Assert.assertEquals(0, size); @@ -1055,7 +1058,7 @@ public class TestHRegion { final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()); this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); + HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family); try { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); @@ -1260,7 +1263,7 @@ public class TestHRegion { CommonFSUtils.getRootDir(walConf), method, walConf); wal.init(); this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); + HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family); int i = 0; Put put = new Put(Bytes.toBytes(i)); put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal @@ -1291,7 +1294,7 @@ public class TestHRegion { method, walConf); wal.init(); this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family); + HConstants.EMPTY_END_ROW, CONF, false, Durability.USE_DEFAULT, wal, family); region.put(put); // 3. Test case where ABORT_FLUSH will throw exception. // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with @@ -3240,7 +3243,7 @@ public class TestHRegion { hLog.init(); // This chunk creation is done throughout the code base. Do we want to move it into core? // It is missing from this test. W/o it we NPE. - region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, + region = initHRegion(tableName, null, null, CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); Cell originalCell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) @@ -3513,7 +3516,7 @@ public class TestHRegion { RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info); - this.region = TEST_UTIL.createLocalHRegion(info, tableDescriptor, wal); + this.region = TEST_UTIL.createLocalHRegion(info, CONF, tableDescriptor, wal); // Put 4 version to memstore long ts = 0; @@ -5405,7 +5408,7 @@ public class TestHRegion { final WALFactory wals = new WALFactory(walConf, HBaseTestingUtility.getRandomUUID().toString()); final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build())); this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW, false, tableDurability, wal, + HConstants.EMPTY_END_ROW, CONF, false, tableDurability, wal, new byte[][] { family }); Put put = new Put(Bytes.toBytes("r1")); @@ -5772,7 +5775,7 @@ public class TestHRegion { RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri); - return initHRegion(tableName, startKey, stopKey, isReadOnly, Durability.SYNC_WAL, wal, + return initHRegion(tableName, startKey, stopKey, conf, isReadOnly, Durability.SYNC_WAL, wal, families); } @@ -5781,11 +5784,12 @@ public class TestHRegion { * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. */ public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, - boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException { + Configuration conf, boolean isReadOnly, Durability durability, WAL wal, + byte[]... families) throws IOException { ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, - isReadOnly, durability, wal, families); + conf, isReadOnly, durability, wal, families); } /** @@ -6052,9 +6056,9 @@ public class TestHRegion { byte[] col1 = Bytes.toBytes("col1"); byte[] col2 = Bytes.toBytes("col2"); long ts = 1; - HBaseConfiguration config = new HBaseConfiguration(); - config.setInt("test.block.size", 1); - this.region = initHRegion(tableName, method, config, families); + Configuration conf = new Configuration(CONF); + conf.setInt("test.block.size", 1); + this.region = initHRegion(tableName, method, conf, families); KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null); KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null); KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null); @@ -6132,7 +6136,7 @@ public class TestHRegion { byte[][] families = { cf1, cf2, cf3 }; byte[] col = Bytes.toBytes("C"); long ts = 1; - HBaseConfiguration conf = new HBaseConfiguration(); + Configuration conf = new Configuration(CONF); // disable compactions in this test. conf.setInt("hbase.hstore.compactionThreshold", 10000); this.region = initHRegion(tableName, method, conf, families); @@ -6294,7 +6298,7 @@ public class TestHRegion { byte[][] families = { cf1, cf2, cf3, cf4 }; byte[] col = Bytes.toBytes("C"); long ts = 1; - HBaseConfiguration conf = new HBaseConfiguration(); + Configuration conf = new Configuration(CONF); // disable compactions in this test. conf.setInt("hbase.hstore.compactionThreshold", 10000); this.region = initHRegion(tableName, method, conf, families); @@ -6360,7 +6364,7 @@ public class TestHRegion { byte[] cf1 = Bytes.toBytes("CF1"); byte[][] families = {cf1}; byte[] col = Bytes.toBytes("C"); - HBaseConfiguration conf = new HBaseConfiguration(); + Configuration conf = new Configuration(CONF); this.region = initHRegion(tableName, method, conf, families); // setup with one storefile and one memstore, to create scanner and get an earlier readPt Put put = new Put(Bytes.toBytes("19998")); @@ -6409,8 +6413,7 @@ public class TestHRegion { byte[] cf1 = Bytes.toBytes("CF1"); byte[][] families = { cf1 }; byte[] col = Bytes.toBytes("C"); - HBaseConfiguration conf = new HBaseConfiguration(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); // setup with one storefile and one memstore, to create scanner and get an earlier readPt Put put = new Put(Bytes.toBytes("19996")); put.addColumn(cf1, col, Bytes.toBytes("val")); @@ -6462,8 +6465,7 @@ public class TestHRegion { byte[][] families = { cf1 }; byte[] col = Bytes.toBytes("C"); - HBaseConfiguration conf = new HBaseConfiguration(); - this.region = initHRegion(tableName, method, conf, families); + this.region = initHRegion(tableName, method, CONF, families); Put put = new Put(Bytes.toBytes("199996")); put.addColumn(cf1, col, Bytes.toBytes("val")); @@ -7364,4 +7366,226 @@ public class TestHRegion { return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles); } } + + @Test + public void testCloseNoInterrupt() throws Exception { + byte[] cf1 = Bytes.toBytes("CF1"); + byte[][] families = { cf1 }; + final int SLEEP_TIME = 10 * 1000; + + Configuration conf = new Configuration(CONF); + // Disable close thread interrupt and server abort behavior + conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, false); + conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000); + region = initHRegion(tableName, method, conf, families); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean holderInterrupted = new AtomicBoolean(); + Thread holder = new Thread(new Runnable() { + @Override + public void run() { + try { + LOG.info("Starting region operation holder"); + region.startRegionOperation(Operation.SCAN); + latch.countDown(); + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + LOG.info("Interrupted"); + holderInterrupted.set(true); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + region.closeRegionOperation(); + } catch (IOException e) { + } + LOG.info("Stopped region operation holder"); + } + } + }); + + holder.start(); + latch.await(); + region.close(); + region = null; + holder.join(); + + assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get()); + } + + @Test + public void testCloseInterrupt() throws Exception { + byte[] cf1 = Bytes.toBytes("CF1"); + byte[][] families = { cf1 }; + final int SLEEP_TIME = 10 * 1000; + + Configuration conf = new Configuration(CONF); + // Enable close thread interrupt and server abort behavior + conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true); + // Speed up the unit test, no need to wait default 10 seconds. + conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000); + region = initHRegion(tableName, method, conf, families); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean holderInterrupted = new AtomicBoolean(); + Thread holder = new Thread(new Runnable() { + @Override + public void run() { + try { + LOG.info("Starting region operation holder"); + region.startRegionOperation(Operation.SCAN); + latch.countDown(); + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + LOG.info("Interrupted"); + holderInterrupted.set(true); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + region.closeRegionOperation(); + } catch (IOException e) { + } + LOG.info("Stopped region operation holder"); + } + } + }); + + holder.start(); + latch.await(); + region.close(); + region = null; + holder.join(); + + assertTrue("Region lock holder was not interrupted", holderInterrupted.get()); + } + + @Test + public void testCloseAbort() throws Exception { + byte[] cf1 = Bytes.toBytes("CF1"); + byte[][] families = { cf1 }; + final int SLEEP_TIME = 10 * 1000; + + Configuration conf = new Configuration(CONF); + // Enable close thread interrupt and server abort behavior. + conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true); + // Set the abort interval to a fraction of sleep time so we are guaranteed to be aborted. + conf.setInt(HRegion.CLOSE_WAIT_TIME, SLEEP_TIME / 2); + // Set the wait interval to a fraction of sleep time so we are guaranteed to be interrupted. + conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, SLEEP_TIME / 4); + region = initHRegion(tableName, method, conf, families); + RegionServerServices rsServices = mock(RegionServerServices.class); + when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost", 1000, 1000)); + region.rsServices = rsServices; + + final CountDownLatch latch = new CountDownLatch(1); + Thread holder = new Thread(new Runnable() { + @Override + public void run() { + try { + LOG.info("Starting region operation holder"); + region.startRegionOperation(Operation.SCAN); + latch.countDown(); + // Hold the lock for SLEEP_TIME seconds no matter how many times we are interrupted. + int timeRemaining = SLEEP_TIME; + while (timeRemaining > 0) { + long start = EnvironmentEdgeManager.currentTime(); + try { + Thread.sleep(timeRemaining); + } catch (InterruptedException e) { + LOG.info("Interrupted"); + } + long end = EnvironmentEdgeManager.currentTime(); + timeRemaining -= end - start; + if (timeRemaining < 0) { + timeRemaining = 0; + } + if (timeRemaining > 0) { + LOG.info("Sleeping again, remaining time " + timeRemaining + " ms"); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + region.closeRegionOperation(); + } catch (IOException e) { + } + LOG.info("Stopped region operation holder"); + } + } + }); + + holder.start(); + latch.await(); + try { + region.close(); + } catch (IOException e) { + LOG.info("Caught expected exception", e); + } + region = null; + holder.join(); + + // Verify the region tried to abort the server + verify(rsServices, atLeast(1)).abort(anyString(),any()); + } + + @Test + public void testInterruptProtection() throws Exception { + byte[] cf1 = Bytes.toBytes("CF1"); + byte[][] families = { cf1 }; + final int SLEEP_TIME = 10 * 1000; + + Configuration conf = new Configuration(CONF); + // Enable close thread interrupt and server abort behavior. + conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true); + conf.setInt(HRegion.CLOSE_WAIT_INTERVAL, 1000); + region = initHRegion(tableName, method, conf, families); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean holderInterrupted = new AtomicBoolean(); + Thread holder = new Thread(new Runnable() { + @Override + public void run() { + try { + LOG.info("Starting region operation holder"); + region.startRegionOperation(Operation.SCAN); + LOG.info("Protecting against interrupts"); + region.disableInterrupts(); + try { + latch.countDown(); + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + LOG.info("Interrupted"); + holderInterrupted.set(true); + } + } finally { + region.enableInterrupts(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + region.closeRegionOperation(); + } catch (IOException e) { + } + LOG.info("Stopped region operation holder"); + } + } + }); + + holder.start(); + latch.await(); + region.close(); + region = null; + holder.join(); + + assertFalse("Region lock holder should not have been interrupted", holderInterrupted.get()); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index a8c12052bb1..9ecdc455f5f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -1709,6 +1709,6 @@ public class TestHRegionReplayEvents { private static HRegion initHRegion(byte[] tableName, byte[]... families) throws IOException { return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW, false, Durability.SYNC_WAL, null, families); + HConstants.EMPTY_END_ROW, CONF, false, Durability.SYNC_WAL, null, families); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java index 59a0741721b..e64994aa310 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; @@ -51,7 +53,8 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion { */ @Override public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, - boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException { + Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) + throws IOException { boolean[] inMemory = new boolean[families.length]; for(int i = 0; i < inMemory.length; i++) { inMemory[i] = true; @@ -59,7 +62,7 @@ public class TestHRegionWithInMemoryFlush extends TestHRegion { ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); return TEST_UTIL.createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, - isReadOnly, durability, wal, inMemory, families); + conf, isReadOnly, durability, wal, inMemory, families); } @Override int getTestCountForTestWritesWhileScanning() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java index 710042e9c27..4792869b2f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java @@ -87,7 +87,7 @@ public class TestRegionIncrement { ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); + HConstants.EMPTY_BYTE_ARRAY, conf, false, Durability.SKIP_WAL, wal, INCREMENT_BYTES); } private void closeRegion(final HRegion region) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionInterrupt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionInterrupt.java new file mode 100644 index 00000000000..10fa0b9af75 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionInterrupt.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({RegionServerTests.class, LargeTests.class}) +public class TestRegionInterrupt { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionInterrupt.class); + + private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class); + + static final byte[] FAMILY = Bytes.toBytes("info"); + + static long sleepTime; + + @Rule + public TableNameTestRule name = new TableNameTestRule(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class); + conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true); + // Ensure the sleep interval is long enough for interrupts to occur. + long waitInterval = conf.getLong(HRegion.CLOSE_WAIT_INTERVAL, + HRegion.DEFAULT_CLOSE_WAIT_INTERVAL); + sleepTime = waitInterval * 2; + // Try to bound the running time of this unit if expected actions do not take place. + conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2); + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testCloseInterruptScanning() throws Exception { + final TableName tableName = name.getTableName(); + LOG.info("Creating table " + tableName); + try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { + // load some data + TEST_UTIL.waitUntilAllRegionsAssigned(tableName); + TEST_UTIL.loadTable(table, FAMILY); + final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false); + // scan the table in the background + Thread scanner = new Thread(new Runnable() { + @Override + public void run() { + Scan scan = new Scan(); + scan.addFamily(FAMILY); + scan.setFilter(new DelayingFilter()); + try { + LOG.info("Starting scan"); + try (ResultScanner rs = table.getScanner(scan)) { + Result r; + do { + r = rs.next(); + if (r != null) { + LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow())); + } + } while (r != null); + } + } catch (IOException e) { + LOG.info("Scanner caught exception", e); + expectedExceptionCaught.set(true); + } finally { + LOG.info("Finished scan"); + } + } + }); + scanner.start(); + + // Wait for the filter to begin sleeping + LOG.info("Waiting for scanner to start"); + Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return DelayingFilter.isSleeping(); + } + }); + + // Offline the table, this will trigger closing + LOG.info("Offlining table " + tableName); + TEST_UTIL.getAdmin().disableTable(tableName); + + // Wait for scanner termination + scanner.join(); + + // When we get here the region has closed and the table is offline + assertTrue("Region operations were not interrupted", + InterruptInterceptingHRegion.wasInterrupted()); + assertTrue("Scanner did not catch expected exception", expectedExceptionCaught.get()); + } + } + + @Test + public void testCloseInterruptMutation() throws Exception { + final TableName tableName = name.getTableName(); + final Admin admin = TEST_UTIL.getAdmin(); + // Create the test table + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .setCoprocessor(MutationDelayingCoprocessor.class.getName()) + .build(); + LOG.info("Creating table " + tableName); + admin.createTable(htd); + TEST_UTIL.waitUntilAllRegionsAssigned(tableName); + + // Insert some data in the background + LOG.info("Starting writes to table " + tableName); + final int NUM_ROWS = 100; + final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false); + Thread inserter = new Thread(new Runnable() { + @Override + public void run() { + try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) { + for (int i = 0; i < NUM_ROWS; i++) { + LOG.info("Writing row " + i + " to " + tableName); + byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i)); + Bytes.random(value); + t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value)); + t.flush(); + } + } catch (IOException e) { + LOG.info("Inserter caught exception", e); + expectedExceptionCaught.set(true); + } + } + }); + inserter.start(); + + // Wait for delayed insertion to begin + LOG.info("Waiting for mutations to start"); + Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return MutationDelayingCoprocessor.isSleeping(); + } + }); + + // Offline the table, this will trigger closing + LOG.info("Offlining table " + tableName); + admin.disableTable(tableName); + + // Wait for the inserter to finish + inserter.join(); + + // When we get here the region has closed and the table is offline + assertTrue("Region operations were not interrupted", + InterruptInterceptingHRegion.wasInterrupted()); + assertTrue("Inserter did not catch expected exception", expectedExceptionCaught.get()); + + } + + public static class InterruptInterceptingHRegion extends HRegion { + + private static boolean interrupted = false; + + public static boolean wasInterrupted() { + return interrupted; + } + + public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs, + Configuration conf, RegionInfo regionInfo, TableDescriptor htd, + RegionServerServices rsServices) { + super(tableDir, wal, fs, conf, regionInfo, htd, rsServices); + } + + public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf, + TableDescriptor htd, RegionServerServices rsServices) { + super(fs, wal, conf, htd, rsServices); + } + + @Override + void checkInterrupt() throws NotServingRegionException, InterruptedIOException { + try { + super.checkInterrupt(); + } catch (NotServingRegionException | InterruptedIOException e) { + interrupted = true; + throw e; + } + } + + @Override + IOException throwOnInterrupt(Throwable t) { + interrupted = true; + return super.throwOnInterrupt(t); + } + + } + + public static class DelayingFilter extends FilterBase { + + static volatile boolean sleeping = false; + + public static boolean isSleeping() { + return sleeping; + } + + @Override + public ReturnCode filterCell(Cell v) throws IOException { + LOG.info("Starting sleep on " + v); + sleeping = true; + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + // restore interrupt status so region scanner can handle it as expected + Thread.currentThread().interrupt(); + LOG.info("Interrupted during sleep on " + v); + } finally { + LOG.info("Done sleep on " + v); + sleeping = false; + } + return ReturnCode.INCLUDE; + } + + public static DelayingFilter parseFrom(final byte [] pbBytes) + throws DeserializationException { + // Just return a new instance. + return new DelayingFilter(); + } + + } + + public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver { + + static volatile boolean sleeping = false; + + public static boolean isSleeping() { + return sleeping; + } + + private void doSleep(Region.Operation op) { + LOG.info("Starting sleep for " + op); + sleeping = true; + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + // restore interrupt status so doMiniBatchMutation etc. can handle it as expected + Thread.currentThread().interrupt(); + LOG.info("Interrupted during " + op); + } finally { + LOG.info("Done"); + sleeping = false; + } + } + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void prePut(ObserverContext c, Put put, WALEdit edit, + Durability durability) throws IOException { + doSleep(Region.Operation.PUT); + RegionObserver.super.prePut(c, put, edit, durability); + } + + @Override + public void preDelete(ObserverContext c, Delete delete, + WALEdit edit, Durability durability) throws IOException { + doSleep(Region.Operation.DELETE); + RegionObserver.super.preDelete(c, delete, edit, durability); + } + + @Override + public Result preAppend(ObserverContext c, Append append) + throws IOException { + doSleep(Region.Operation.APPEND); + return RegionObserver.super.preAppend(c, append); + } + + @Override + public Result preIncrement(ObserverContext c, Increment increment) + throws IOException { + doSleep(Region.Operation.INCREMENT); + return RegionObserver.super.preIncrement(c, increment); + } + + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index ce7919e36ee..e850853b60e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -226,7 +226,7 @@ public class TestWALLockup { // There is no 'stop' once a logRoller is running.. it just dies. logRoller.start(); // Now get a region and start adding in edits. - final HRegion region = initHRegion(tableName, null, null, dodgyWAL); + final HRegion region = initHRegion(tableName, null, null, CONF, dodgyWAL); byte [] bytes = Bytes.toBytes(getName()); NavigableMap scopes = new TreeMap<>( Bytes.BYTES_COMPARATOR); @@ -557,11 +557,11 @@ public class TestWALLockup { * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} * when done. */ - private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) - throws IOException { + private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, + Configuration conf, WAL wal) throws IOException { ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); - return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL, - wal, COLUMN_FAMILY_BYTES); + return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false, + Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index bdc516ce724..8a82848f365 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -563,7 +563,7 @@ public abstract class AbstractTestFSWAL { RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); - TEST_UTIL.createLocalHRegion(hri, htd, wal).close(); + TEST_UTIL.createLocalHRegion(hri, CONF, htd, wal).close(); RegionServerServices rsServices = mock(RegionServerServices.class); when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); when(rsServices.getConfiguration()).thenReturn(conf); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index a655bdaf7c1..e763896d8df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -168,7 +168,7 @@ public class TestFSHLog extends AbstractTestFSWAL { RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); - final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log); + final HRegion region = TEST_UTIL.createLocalHRegion(hri, CONF, htd, log); ExecutorService exec = Executors.newFixedThreadPool(2); // do a regular write first because of memstore size calculation. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java index 2dd948c290d..0daeb13b16e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java @@ -89,7 +89,7 @@ public abstract class WALDurabilityTestBase { FileSystem fs = FileSystem.get(conf); Path rootDir = new Path(dir + getName()); T wal = getWAL(fs, rootDir, getName(), conf); - HRegion region = initHRegion(tableName, null, null, wal); + HRegion region = initHRegion(tableName, null, null, conf, wal); try { resetSyncFlag(wal); assertNull(getSyncFlag(wal)); @@ -114,7 +114,7 @@ public abstract class WALDurabilityTestBase { conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true"); fs = FileSystem.get(conf); wal = getWAL(fs, rootDir, getName(), conf); - region = initHRegion(tableName, null, null, wal); + region = initHRegion(tableName, null, null, conf, wal); try { resetSyncFlag(wal); @@ -156,11 +156,11 @@ public abstract class WALDurabilityTestBase { * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} * when done. */ - public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) - throws IOException { + public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, + Configuration conf, WAL wal) throws IOException { ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); - return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.USE_DEFAULT, - wal, COLUMN_FAMILY_BYTES); + return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, conf, false, + Durability.USE_DEFAULT, wal, COLUMN_FAMILY_BYTES); } }