diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java deleted file mode 100644 index b64ebdfa6f5..00000000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DrainBarrier.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.util; - -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.yetus.audience.InterfaceAudience; - -/** - * A simple barrier that can be used by classes that need to wait for some operations to - * finish before stopping/closing/etc. forever. - */ -@InterfaceAudience.Private -public class DrainBarrier { - /** - * Contains the number of outstanding operations, as well as flags. - * Initially, the number of operations is 1. Each beginOp increments, and endOp decrements it. - * beginOp does not proceed when it sees the draining flag. When stop is called, it atomically - * decrements the number of operations (the initial 1) and sets the draining flag. If stop did - * the decrement to zero, that means there are no more operations outstanding, so stop is done. - * Otherwise, stop blocks, and the endOp that decrements the count to 0 unblocks it. - */ - private final AtomicLong valueAndFlags = new AtomicLong(inc(0)); - private final static long DRAINING_FLAG = 0x1; - private final static int FLAG_BIT_COUNT = 1; - - /** - * Tries to start an operation. - * @return false iff the stop is in progress, and the operation cannot be started. - */ - public boolean beginOp() { - long oldValAndFlags; - do { - oldValAndFlags = valueAndFlags.get(); - if (isDraining(oldValAndFlags)) return false; - } while (!valueAndFlags.compareAndSet(oldValAndFlags, inc(oldValAndFlags))); - return true; - } - - /** - * Ends the operation. Unblocks the blocked caller of stop, if necessary. - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", - justification="First, we do change the state before notify, 2nd, it doesn't even matter") - public void endOp() { - long oldValAndFlags; - do { - oldValAndFlags = valueAndFlags.get(); - long unacceptableCount = isDraining(oldValAndFlags) ? 0 : 1; - if (getValue(oldValAndFlags) == unacceptableCount) { - throw new AssertionError("endOp called without corresponding beginOp call (" - + "the current count is " + unacceptableCount + ")"); - } - } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags))); - if (getValue(oldValAndFlags) == 1) { - synchronized (this) { this.notifyAll(); } - } - } - - /** - * Blocks new operations from starting, waits for the current ones to drain. - * If someone already called it, returns immediately, which is currently unavoidable as - * most of the users stop and close things right and left, and hope for the best. - * stopAndWaitForOpsOnce asserts instead. - * @throws InterruptedException the wait for operations has been interrupted. - */ - public void stopAndDrainOps() throws InterruptedException { - stopAndDrainOps(true); - } - - /** - * Blocks new operations from starting, waits for the current ones to drain. - * Can only be called once. - * @throws InterruptedException the wait for operations has been interrupted. - */ - public void stopAndDrainOpsOnce() throws InterruptedException { - stopAndDrainOps(false); - } - - /** - * @param ignoreRepeatedCalls If this is true and somebody already called stop, this method - * will return immediately if true; if this is false and somebody - * already called stop, it will assert. - */ - // Justification for warnings - wait is not unconditional, and contrary to what WA_NOT_IN_LOOP - // description says we are not waiting on multiple conditions. - @edu.umd.cs.findbugs.annotations.SuppressWarnings({"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"}) - private void stopAndDrainOps(boolean ignoreRepeatedCalls) throws InterruptedException { - long oldValAndFlags; - do { - oldValAndFlags = valueAndFlags.get(); - if (isDraining(oldValAndFlags)) { - if (ignoreRepeatedCalls) return; - throw new AssertionError("stopAndWaitForOpsOnce called more than once"); - } - } while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags) | DRAINING_FLAG)); - if (getValue(oldValAndFlags) == 1) return; // There were no operations outstanding. - synchronized (this) { this.wait(); } - } - - // Helper methods. - private static final boolean isDraining(long valueAndFlags) { - return (valueAndFlags & DRAINING_FLAG) == DRAINING_FLAG; - } - - private static final long getValue(long valueAndFlags) { - return valueAndFlags >> FLAG_BIT_COUNT; - } - - private static final long inc(long valueAndFlags) { - return valueAndFlags + (1 << FLAG_BIT_COUNT); // Not checking for overflow. - } - - private static final long dec(long valueAndFlags) { - return valueAndFlags - (1 << FLAG_BIT_COUNT); // Negative overflow checked outside. - } -} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java deleted file mode 100644 index 5c3d053bc45..00000000000 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestDrainBarrier.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({MiscTests.class, SmallTests.class}) -public class TestDrainBarrier { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestDrainBarrier.class); - - @Test - public void testBeginEndStopWork() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - assertTrue(barrier.beginOp()); - assertTrue(barrier.beginOp()); - barrier.endOp(); - barrier.endOp(); - barrier.stopAndDrainOps(); - assertFalse(barrier.beginOp()); - } - - @Test - public void testUnmatchedEndAssert() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - try { - barrier.endOp(); - throw new Error("Should have asserted"); - } catch (AssertionError e) { - } - - barrier.beginOp(); - barrier.beginOp(); - barrier.endOp(); - barrier.endOp(); - try { - barrier.endOp(); - throw new Error("Should have asserted"); - } catch (AssertionError e) { - } - } - - @Test - public void testStopWithoutOpsDoesntBlock() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - barrier.stopAndDrainOpsOnce(); - - barrier = new DrainBarrier(); - barrier.beginOp(); - barrier.endOp(); - barrier.stopAndDrainOpsOnce(); - } - - @Test - /** This test tests blocking and can have false positives in very bad timing cases. */ - public void testStopIsBlockedByOps() throws Exception { - final DrainBarrier barrier = new DrainBarrier(); - barrier.beginOp(); - barrier.beginOp(); - barrier.beginOp(); - barrier.endOp(); - - Thread stoppingThread = new Thread(new Runnable() { - @Override - public void run() { - try { - barrier.stopAndDrainOpsOnce(); - } catch (InterruptedException e) { - fail("Should not have happened"); - } - } - }); - stoppingThread.start(); - - // First "end" should not unblock the thread, but the second should. - barrier.endOp(); - stoppingThread.join(1000); - assertTrue(stoppingThread.isAlive()); - barrier.endOp(); - stoppingThread.join(30000); // When not broken, will be a very fast wait; set safe value. - assertFalse(stoppingThread.isAlive()); - } - - @Test - public void testMultipleStopOnceAssert() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - barrier.stopAndDrainOpsOnce(); - try { - barrier.stopAndDrainOpsOnce(); - throw new Error("Should have asserted"); - } catch (AssertionError e) { - } - } - - @Test - public void testMultipleSloppyStopsHaveNoEffect() throws Exception { - DrainBarrier barrier = new DrainBarrier(); - barrier.stopAndDrainOps(); - barrier.stopAndDrainOps(); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3a93c769fb0..0d59b124b5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1181,7 +1181,7 @@ public class HRegionServer extends HasThread implements @VisibleForTesting protected void tryRegionServerReport(long reportStartTime, long reportEndTime) - throws IOException { + throws IOException { RegionServerStatusService.BlockingInterface rss = rssStub; if (rss == null) { // the current server could be stopping. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 7a247cf668e..55c5219ca78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -132,6 +132,23 @@ public class LogRoller extends HasThread implements Closeable { } } + private void abort(String reason, Throwable cause) { + // close all WALs before calling abort on RS. + // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we + // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it + // is already broken. + for (WAL wal : walNeedsRoll.keySet()) { + // shutdown rather than close here since we are going to abort the RS and the wals need to be + // split when recovery + try { + wal.shutdown(); + } catch (IOException e) { + LOG.warn("Failed to shutdown wal", e); + } + } + server.abort(reason, cause); + } + @Override public void run() { while (running) { @@ -153,10 +170,8 @@ public class LogRoller extends HasThread implements Closeable { continue; } // Time for periodic roll - if (LOG.isDebugEnabled()) { - LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed"); - } - } else if (LOG.isDebugEnabled()) { + LOG.debug("Wal roll period {} ms elapsed", this.rollperiod); + } else { LOG.debug("WAL roll requested"); } rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH @@ -170,20 +185,22 @@ public class LogRoller extends HasThread implements Closeable { entry.getValue().booleanValue()); walNeedsRoll.put(wal, Boolean.FALSE); if (regionsToFlush != null) { - for (byte [] r: regionsToFlush) scheduleFlush(r); + for (byte[] r : regionsToFlush) { + scheduleFlush(r); + } } } } catch (FailedLogCloseException e) { - server.abort("Failed log close in log roller", e); + abort("Failed log close in log roller", e); } catch (java.net.ConnectException e) { - server.abort("Failed log close in log roller", e); + abort("Failed log close in log roller", e); } catch (IOException ex) { // Abort if we get here. We probably won't recover an IOE. HBASE-1132 - server.abort("IOE in log roller", + abort("IOE in log roller", ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex); } catch (Exception ex) { LOG.error("Log rolling failed", ex); - server.abort("Log rolling failed", ex); + abort("Log rolling failed", ex); } finally { try { rollLog.set(false); @@ -211,9 +228,8 @@ public class LogRoller extends HasThread implements Closeable { } } if (!scheduled) { - LOG.warn("Failed to schedule flush of " + - Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" + - requester); + LOG.warn("Failed to schedule flush of {}, region={}, requester={}", + Bytes.toString(encodedRegionName), r, requester); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 54a5cd3c17f..14fbe10d52a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -17,12 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import com.lmax.disruptor.RingBuffer; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -46,7 +45,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; - import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -66,7 +64,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -84,6 +81,7 @@ import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -173,9 +171,6 @@ public abstract class AbstractFSWAL implements WAL { */ protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); - /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ - protected final DrainBarrier closeBarrier = new DrainBarrier(); - protected final long slowSyncNs; private final long walSyncTimeoutNs; @@ -452,32 +447,22 @@ public abstract class AbstractFSWAL implements WAL { @Override public Long startCacheFlush(byte[] encodedRegionName, Set families) { - if (!closeBarrier.beginOp()) { - LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); - return null; - } return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); } @Override public Long startCacheFlush(byte[] encodedRegionName, Map familyToSeq) { - if (!closeBarrier.beginOp()) { - LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); - return null; - } return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); } @Override public void completeCacheFlush(byte[] encodedRegionName) { this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); - closeBarrier.endOp(); } @Override public void abortCacheFlush(byte[] encodedRegionName) { this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); - closeBarrier.endOp(); } @Override @@ -715,7 +700,11 @@ public abstract class AbstractFSWAL implements WAL { // Now we have published the ringbuffer, halt the current thread until we get an answer back. try { if (syncFuture != null) { - syncFuture.get(walSyncTimeoutNs); + if (closed) { + throw new IOException("WAL has been closed"); + } else { + syncFuture.get(walSyncTimeoutNs); + } } } catch (TimeoutIOException tioe) { // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer @@ -755,10 +744,6 @@ public abstract class AbstractFSWAL implements WAL { LOG.debug("WAL closed. Skipping rolling of writer"); return regionsToFlush; } - if (!closeBarrier.beginOp()) { - LOG.debug("WAL closing. Skipping rolling of writer"); - return regionsToFlush; - } try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { Path oldPath = getOldPath(); Path newPath = getNewPath(); @@ -783,8 +768,6 @@ public abstract class AbstractFSWAL implements WAL { throw new IOException( "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", exception); - } finally { - closeBarrier.endOp(); } return regionsToFlush; } finally { @@ -818,20 +801,18 @@ public abstract class AbstractFSWAL implements WAL { return; } closed = true; - try { - // Prevent all further flushing and rolling. - closeBarrier.stopAndDrainOps(); - } catch (InterruptedException e) { - LOG.error("Exception while waiting for cache flushes and log rolls", e); - Thread.currentThread().interrupt(); - } // Tell our listeners that the log is closing if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { i.logCloseRequested(); } } - doShutdown(); + rollWriterLock.lock(); + try { + doShutdown(); + } finally { + rollWriterLock.unlock(); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index faf3b7747e5..19d89dfec4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -608,19 +608,8 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { - try { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup, - channelClass); - } catch (IOException e) { - // this usually means master already think we are dead so let's fail all the pending - // syncs. The shutdown process of RS will wait for all regions to be closed before calling - // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead - // lock. - if (e.getMessage().contains("Parent directory doesn't exist:")) { - syncFutures.forEach(f -> f.done(f.getTxid(), e)); - } - throw e; - } + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup, + channelClass); } private void waitForSafePoint() { @@ -675,17 +664,34 @@ public class AsyncFSWAL extends AbstractFSWAL { closeExecutor.shutdown(); try { if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { - LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" - + " the close of async writer doesn't complete." - + "Please check the status of underlying filesystem" - + " or increase the wait time by the config \"" - + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + "\""); + LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + + " the close of async writer doesn't complete." + + "Please check the status of underlying filesystem" + + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + + "\""); } } catch (InterruptedException e) { LOG.error("The wait for close of async writer is interrupted"); Thread.currentThread().interrupt(); } IOException error = new IOException("WAL has been closed"); + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + // drain all the pending sync requests + for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; + nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { + break; + } + RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case SYNC: + syncFutures.add(truck.unloadSync()); + break; + default: + break; + } + } + // and fail them syncFutures.forEach(f -> f.done(f.getTxid(), error)); if (!(consumeExecutor instanceof EventLoop)) { consumeExecutor.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java new file mode 100644 index 00000000000..6c9b5e358c8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestShutdownWhileWALBroken.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * See HBASE-19929 for more details. + */ +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestShutdownWhileWALBroken { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestShutdownWhileWALBroken.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestShutdownWhileWALBroken.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("TestShutdownWhileWALBroken"); + + private static byte[] CF = Bytes.toBytes("CF"); + + @Parameter + public String walType; + + @Parameters(name = "{index}: WAL={0}") + public static List params() { + return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" }); + } + + public static final class MyRegionServer extends HRegionServer { + + private final CountDownLatch latch = new CountDownLatch(1); + + public MyRegionServer(Configuration conf) throws IOException { + super(conf); + } + + @Override + protected void tryRegionServerReport(long reportStartTime, long reportEndTime) + throws IOException { + try { + super.tryRegionServerReport(reportStartTime, reportEndTime); + } catch (YouAreDeadException e) { + LOG.info("Caught YouAreDeadException, ignore", e); + } + } + + @Override + public void abort(String reason, Throwable cause) { + if (cause instanceof SessionExpiredException) { + // called from ZKWatcher, let's wait a bit to make sure that we call stop before calling + // abort. + try { + latch.await(); + } catch (InterruptedException e) { + } + } else { + // abort from other classes, usually LogRoller, now we can make progress on abort. + latch.countDown(); + } + super.abort(reason, cause); + } + } + + @Before + public void setUp() throws Exception { + UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, MyRegionServer.class, + HRegionServer.class); + UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walType); + UTIL.getConfiguration().set(WALFactory.META_WAL_PROVIDER, walType); + UTIL.startMiniCluster(2); + } + + @After + public void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + UTIL.createMultiRegionTable(TABLE_NAME, CF); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + UTIL.loadTable(table, CF); + } + int numRegions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size(); + RegionServerThread rst0 = UTIL.getMiniHBaseCluster().getRegionServerThreads().get(0); + RegionServerThread rst1 = UTIL.getMiniHBaseCluster().getRegionServerThreads().get(1); + HRegionServer liveRS; + RegionServerThread toKillRSThread; + if (rst1.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty()) { + liveRS = rst0.getRegionServer(); + toKillRSThread = rst1; + } else { + liveRS = rst1.getRegionServer(); + toKillRSThread = rst0; + } + assertTrue(liveRS.getRegions(TABLE_NAME).size() < numRegions); + UTIL.expireSession(toKillRSThread.getRegionServer().getZooKeeper(), false); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return liveRS.getRegions(TABLE_NAME).size() == numRegions; + } + + @Override + public String explainFailure() throws Exception { + return "Failover is not finished yet"; + } + }); + toKillRSThread.getRegionServer().stop("Stop for test"); + // make sure that we can successfully quit + toKillRSThread.join(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 1e59248b363..ca659146186 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -43,9 +42,9 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.wal.DamagedWALException; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.Threads; @@ -67,11 +66,12 @@ import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + /** - * Testing for lock up of WAL subsystem. - * Copied from TestHRegion. + * Testing for lock up of FSHLog. */ -@Category({MediumTests.class}) +@Category({ RegionServerTests.class, MediumTests.class }) public class TestWALLockup { @ClassRule @@ -79,14 +79,15 @@ public class TestWALLockup { HBaseClassTestRule.forClass(TestWALLockup.class); private static final Logger LOG = LoggerFactory.getLogger(TestWALLockup.class); - @Rule public TestName name = new TestName(); + + @Rule + public TestName name = new TestName(); private static final String COLUMN_FAMILY = "MyCF"; private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); HRegion region = null; - // Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack) - private static HBaseTestingUtility TEST_UTIL; + private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Configuration CONF ; private String dir; @@ -95,7 +96,6 @@ public class TestWALLockup { @Before public void setup() throws IOException { - TEST_UTIL = HBaseTestingUtility.createLocalHTU(); CONF = TEST_UTIL.getConfiguration(); // Disable block cache. CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); @@ -110,10 +110,90 @@ public class TestWALLockup { TEST_UTIL.cleanupTestDir(); } - String getName() { + private String getName() { return name.getMethodName(); } + // A WAL that we can have throw exceptions when a flag is set. + private static final class DodgyFSLog extends FSHLog { + // Set this when want the WAL to start throwing exceptions. + volatile boolean throwException = false; + + // Latch to hold up processing until after another operation has had time to run. + CountDownLatch latch = new CountDownLatch(1); + + public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + throws IOException { + super(fs, root, logDir, conf); + } + + @Override + protected void afterCreatingZigZagLatch() { + // If throwException set, then append will throw an exception causing the WAL to be + // rolled. We'll come in here. Hold up processing until a sync can get in before + // the zigzag has time to complete its setup and get its own sync in. This is what causes + // the lock up we've seen in production. + if (throwException) { + try { + LOG.info("LATCHED"); + // So, timing can have it that the test can run and the bad flush below happens + // before we get here. In this case, we'll be stuck waiting on this latch but there + // is nothing in the WAL pipeline to get us to the below beforeWaitOnSafePoint... + // because all WALs have rolled. In this case, just give up on test. + if (!this.latch.await(5, TimeUnit.SECONDS)) { + LOG.warn("GIVE UP! Failed waiting on latch...Test is ABORTED!"); + } + } catch (InterruptedException e) { + } + } + } + + @Override + protected void beforeWaitOnSafePoint() { + if (throwException) { + LOG.info("COUNTDOWN"); + // Don't countdown latch until someone waiting on it otherwise, the above + // afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll + // be stuck; test won't go down + while (this.latch.getCount() <= 0) + Threads.sleep(1); + this.latch.countDown(); + } + } + + @Override + protected Writer createWriterInstance(Path path) throws IOException { + final Writer w = super.createWriterInstance(path); + return new Writer() { + @Override + public void close() throws IOException { + w.close(); + } + + @Override + public void sync() throws IOException { + if (throwException) { + throw new IOException("FAKE! Failed to replace a bad datanode...SYNC"); + } + w.sync(); + } + + @Override + public void append(Entry entry) throws IOException { + if (throwException) { + throw new IOException("FAKE! Failed to replace a bad datanode...APPEND"); + } + w.append(entry); + } + + @Override + public long getLength() { + return w.getLength(); + } + }; + } + } + /** * Reproduce locking up that happens when we get an inopportune sync during setup for * zigzaglatch wait. See HBASE-14317. If below is broken, we will see this test timeout because @@ -121,89 +201,8 @@ public class TestWALLockup { *

First I need to set up some mocks for Server and RegionServerServices. I also need to * set up a dodgy WAL that will throw an exception when we go to append to it. */ - @Test (timeout=20000) + @Test public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException { - // A WAL that we can have throw exceptions when a flag is set. - class DodgyFSLog extends FSHLog { - // Set this when want the WAL to start throwing exceptions. - volatile boolean throwException = false; - - // Latch to hold up processing until after another operation has had time to run. - CountDownLatch latch = new CountDownLatch(1); - - public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) - throws IOException { - super(fs, root, logDir, conf); - } - - @Override - protected void afterCreatingZigZagLatch() { - // If throwException set, then append will throw an exception causing the WAL to be - // rolled. We'll come in here. Hold up processing until a sync can get in before - // the zigzag has time to complete its setup and get its own sync in. This is what causes - // the lock up we've seen in production. - if (throwException) { - try { - LOG.info("LATCHED"); - // So, timing can have it that the test can run and the bad flush below happens - // before we get here. In this case, we'll be stuck waiting on this latch but there - // is nothing in the WAL pipeline to get us to the below beforeWaitOnSafePoint... - // because all WALs have rolled. In this case, just give up on test. - if (!this.latch.await(5, TimeUnit.SECONDS)) { - LOG.warn("GIVE UP! Failed waiting on latch...Test is ABORTED!"); - } - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } - - @Override - protected void beforeWaitOnSafePoint() { - if (throwException) { - LOG.info("COUNTDOWN"); - // Don't countdown latch until someone waiting on it otherwise, the above - // afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll - // be stuck; test won't go down - while (this.latch.getCount() <= 0) Threads.sleep(1); - this.latch.countDown(); - } - } - - @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); - return new Writer() { - @Override - public void close() throws IOException { - w.close(); - } - - @Override - public void sync() throws IOException { - if (throwException) { - throw new IOException("FAKE! Failed to replace a bad datanode...SYNC"); - } - w.sync(); - } - - @Override - public void append(Entry entry) throws IOException { - if (throwException) { - throw new IOException("FAKE! Failed to replace a bad datanode...APPEND"); - } - w.append(entry); - } - - @Override - public long getLength() { - return w.getLength(); - } - }; - } - } - // Mocked up server and regionserver services. Needed below. Server server = Mockito.mock(Server.class); Mockito.when(server.getConfiguration()).thenReturn(CONF); @@ -222,7 +221,6 @@ public class TestWALLockup { // There is no 'stop' once a logRoller is running.. it just dies. logRoller.start(); // Now get a region and start adding in edits. - HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); final HRegion region = initHRegion(tableName, null, null, dodgyWAL); byte [] bytes = Bytes.toBytes(getName()); NavigableMap scopes = new TreeMap<>( @@ -236,7 +234,7 @@ public class TestWALLockup { Put put = new Put(bytes); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), - htd.getTableName(), System.currentTimeMillis(), mvcc, scopes); + TableName.META_TABLE_NAME, System.currentTimeMillis(), mvcc, scopes); WALEdit edit = new WALEdit(); CellScanner CellScanner = put.cellScanner(); assertTrue(CellScanner.advance()); @@ -281,7 +279,9 @@ public class TestWALLockup { t.setDaemon(true); t.start(); // Wait until - while (dodgyWAL.latch.getCount() > 0) Threads.sleep(1); + while (dodgyWAL.latch.getCount() > 0) { + Threads.sleep(1); + } // Now assert I got a new WAL file put in place even though loads of errors above. assertTrue(originalWAL != dodgyWAL.getCurrentFileName()); // Can I append to it? @@ -294,203 +294,13 @@ public class TestWALLockup { } finally { // To stop logRoller, its server has to say it is stopped. Mockito.when(server.isStopped()).thenReturn(true); - if (logRoller != null) logRoller.close(); - try { - if (region != null) region.close(); - if (dodgyWAL != null) dodgyWAL.close(); - } catch (Exception e) { - LOG.info("On way out", e); - } - } - } - - /** - * Reproduce locking up that happens when there's no further syncs after - * append fails, and causing an isolated sync then infinite wait. See - * HBASE-16960. If below is broken, we will see this test timeout because it - * is locked up. - *

- * Steps for reproduce:
- * 1. Trigger server abort through dodgyWAL1
- * 2. Add a {@link DummyWALActionsListener} to dodgyWAL2 to cause ringbuffer - * event handler thread sleep for a while thus keeping {@code endOfBatch} - * false
- * 3. Publish a sync then an append which will throw exception, check whether - * the sync could return - */ - @Test(timeout = 20000) - public void testLockup16960() throws IOException { - // A WAL that we can have throw exceptions when a flag is set. - class DodgyFSLog extends FSHLog { - // Set this when want the WAL to start throwing exceptions. - volatile boolean throwException = false; - - public DodgyFSLog(FileSystem fs, Path root, String logDir, - Configuration conf) throws IOException { - super(fs, root, logDir, conf); - } - - @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); - return new Writer() { - @Override - public void close() throws IOException { - w.close(); - } - - @Override - public void sync() throws IOException { - if (throwException) { - throw new IOException( - "FAKE! Failed to replace a bad datanode...SYNC"); - } - w.sync(); - } - - @Override - public void append(Entry entry) throws IOException { - if (throwException) { - throw new IOException( - "FAKE! Failed to replace a bad datanode...APPEND"); - } - w.append(entry); - } - - @Override - public long getLength() { - return w.getLength(); - } - }; - } - - @Override - protected long doReplaceWriter(Path oldPath, Path newPath, - Writer nextWriter) throws IOException { - if (throwException) { - throw new FailedLogCloseException("oldPath=" + oldPath + ", newPath=" - + newPath); - } - long oldFileLen = 0L; - oldFileLen = super.doReplaceWriter(oldPath, newPath, nextWriter); - return oldFileLen; - } - } - - // Mocked up server and regionserver services. Needed below. - Server server = new DummyServer(CONF, ServerName.valueOf( - "hostname1.example.org", 1234, 1L).toString()); - RegionServerServices services = Mockito.mock(RegionServerServices.class); - - CONF.setLong("hbase.regionserver.hlog.sync.timeout", 10000); - - // OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, - // go ahead with test. - FileSystem fs = FileSystem.get(CONF); - Path rootDir = new Path(dir + getName()); - DodgyFSLog dodgyWAL1 = new DodgyFSLog(fs, rootDir, getName(), CONF); - - Path rootDir2 = new Path(dir + getName() + "2"); - final DodgyFSLog dodgyWAL2 = new DodgyFSLog(fs, rootDir2, getName() + "2", - CONF); - // Add a listener to force ringbuffer event handler sleep for a while - dodgyWAL2.registerWALActionsListener(new DummyWALActionsListener()); - - // I need a log roller running. - LogRoller logRoller = new LogRoller(server, services); - logRoller.addWAL(dodgyWAL1); - logRoller.addWAL(dodgyWAL2); - // There is no 'stop' once a logRoller is running.. it just dies. - logRoller.start(); - // Now get a region and start adding in edits. - HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME); - final HRegion region = initHRegion(tableName, null, null, dodgyWAL1); - byte[] bytes = Bytes.toBytes(getName()); - NavigableMap scopes = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - scopes.put(COLUMN_FAMILY_BYTES, 0); - MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - try { - Put put = new Put(bytes); - put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), - htd.getTableName(), System.currentTimeMillis(), mvcc, scopes); - WALEdit edit = new WALEdit(); - CellScanner CellScanner = put.cellScanner(); - assertTrue(CellScanner.advance()); - edit.add(CellScanner.current()); - - LOG.info("SET throwing of exception on append"); - dodgyWAL1.throwException = true; - // This append provokes a WAL roll request - dodgyWAL1.append(region.getRegionInfo(), key, edit, true); - boolean exception = false; - try { - dodgyWAL1.sync(); - } catch (Exception e) { - exception = true; - } - assertTrue("Did not get sync exception", exception); - - // LogRoller call dodgyWAL1.rollWriter get FailedLogCloseException and - // cause server abort. - try { - // wait LogRoller exit. - Thread.sleep(50); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - final CountDownLatch latch = new CountDownLatch(1); - - // make RingBufferEventHandler sleep 1s, so the following sync - // endOfBatch=false - key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), - TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc, scopes); - dodgyWAL2.append(region.getRegionInfo(), key, edit, true); - - Thread t = new Thread("Sync") { - @Override - public void run() { - try { - dodgyWAL2.sync(); - } catch (IOException e) { - LOG.info("In sync", e); - } - latch.countDown(); - LOG.info("Sync exiting"); - } - }; - t.setDaemon(true); - t.start(); - try { - // make sure sync have published. - Thread.sleep(100); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - // make append throw DamagedWALException - key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), - TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc, scopes); - dodgyWAL2.append(region.getRegionInfo(), key, edit, true); - - while (latch.getCount() > 0) { - Threads.sleep(100); - } - assertTrue(server.isAborted()); - } finally { - if (logRoller != null) { - logRoller.close(); - } + Closeables.close(logRoller, true); try { if (region != null) { region.close(); } - if (dodgyWAL1 != null) { - dodgyWAL1.close(); - } - if (dodgyWAL2 != null) { - dodgyWAL2.close(); + if (dodgyWAL != null) { + dodgyWAL.close(); } } catch (Exception e) { LOG.info("On way out", e); @@ -606,11 +416,11 @@ public class TestWALLockup { } /** - * @return A region on which you must call - * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. + * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} + * when done. */ - public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) - throws IOException { + private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal) + throws IOException { ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);