HBASE-19929 Call RS.stop on a session expired RS may hang
This commit is contained in:
parent
d8b999e695
commit
dcbb331792
|
@ -1,134 +0,0 @@
|
||||||
/**
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.util;
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A simple barrier that can be used by classes that need to wait for some operations to
|
|
||||||
* finish before stopping/closing/etc. forever.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class DrainBarrier {
|
|
||||||
/**
|
|
||||||
* Contains the number of outstanding operations, as well as flags.
|
|
||||||
* Initially, the number of operations is 1. Each beginOp increments, and endOp decrements it.
|
|
||||||
* beginOp does not proceed when it sees the draining flag. When stop is called, it atomically
|
|
||||||
* decrements the number of operations (the initial 1) and sets the draining flag. If stop did
|
|
||||||
* the decrement to zero, that means there are no more operations outstanding, so stop is done.
|
|
||||||
* Otherwise, stop blocks, and the endOp that decrements the count to 0 unblocks it.
|
|
||||||
*/
|
|
||||||
private final AtomicLong valueAndFlags = new AtomicLong(inc(0));
|
|
||||||
private final static long DRAINING_FLAG = 0x1;
|
|
||||||
private final static int FLAG_BIT_COUNT = 1;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Tries to start an operation.
|
|
||||||
* @return false iff the stop is in progress, and the operation cannot be started.
|
|
||||||
*/
|
|
||||||
public boolean beginOp() {
|
|
||||||
long oldValAndFlags;
|
|
||||||
do {
|
|
||||||
oldValAndFlags = valueAndFlags.get();
|
|
||||||
if (isDraining(oldValAndFlags)) return false;
|
|
||||||
} while (!valueAndFlags.compareAndSet(oldValAndFlags, inc(oldValAndFlags)));
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Ends the operation. Unblocks the blocked caller of stop, if necessary.
|
|
||||||
*/
|
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
|
||||||
justification="First, we do change the state before notify, 2nd, it doesn't even matter")
|
|
||||||
public void endOp() {
|
|
||||||
long oldValAndFlags;
|
|
||||||
do {
|
|
||||||
oldValAndFlags = valueAndFlags.get();
|
|
||||||
long unacceptableCount = isDraining(oldValAndFlags) ? 0 : 1;
|
|
||||||
if (getValue(oldValAndFlags) == unacceptableCount) {
|
|
||||||
throw new AssertionError("endOp called without corresponding beginOp call ("
|
|
||||||
+ "the current count is " + unacceptableCount + ")");
|
|
||||||
}
|
|
||||||
} while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags)));
|
|
||||||
if (getValue(oldValAndFlags) == 1) {
|
|
||||||
synchronized (this) { this.notifyAll(); }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Blocks new operations from starting, waits for the current ones to drain.
|
|
||||||
* If someone already called it, returns immediately, which is currently unavoidable as
|
|
||||||
* most of the users stop and close things right and left, and hope for the best.
|
|
||||||
* stopAndWaitForOpsOnce asserts instead.
|
|
||||||
* @throws InterruptedException the wait for operations has been interrupted.
|
|
||||||
*/
|
|
||||||
public void stopAndDrainOps() throws InterruptedException {
|
|
||||||
stopAndDrainOps(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Blocks new operations from starting, waits for the current ones to drain.
|
|
||||||
* Can only be called once.
|
|
||||||
* @throws InterruptedException the wait for operations has been interrupted.
|
|
||||||
*/
|
|
||||||
public void stopAndDrainOpsOnce() throws InterruptedException {
|
|
||||||
stopAndDrainOps(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param ignoreRepeatedCalls If this is true and somebody already called stop, this method
|
|
||||||
* will return immediately if true; if this is false and somebody
|
|
||||||
* already called stop, it will assert.
|
|
||||||
*/
|
|
||||||
// Justification for warnings - wait is not unconditional, and contrary to what WA_NOT_IN_LOOP
|
|
||||||
// description says we are not waiting on multiple conditions.
|
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"})
|
|
||||||
private void stopAndDrainOps(boolean ignoreRepeatedCalls) throws InterruptedException {
|
|
||||||
long oldValAndFlags;
|
|
||||||
do {
|
|
||||||
oldValAndFlags = valueAndFlags.get();
|
|
||||||
if (isDraining(oldValAndFlags)) {
|
|
||||||
if (ignoreRepeatedCalls) return;
|
|
||||||
throw new AssertionError("stopAndWaitForOpsOnce called more than once");
|
|
||||||
}
|
|
||||||
} while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags) | DRAINING_FLAG));
|
|
||||||
if (getValue(oldValAndFlags) == 1) return; // There were no operations outstanding.
|
|
||||||
synchronized (this) { this.wait(); }
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper methods.
|
|
||||||
private static final boolean isDraining(long valueAndFlags) {
|
|
||||||
return (valueAndFlags & DRAINING_FLAG) == DRAINING_FLAG;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final long getValue(long valueAndFlags) {
|
|
||||||
return valueAndFlags >> FLAG_BIT_COUNT;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final long inc(long valueAndFlags) {
|
|
||||||
return valueAndFlags + (1 << FLAG_BIT_COUNT); // Not checking for overflow.
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final long dec(long valueAndFlags) {
|
|
||||||
return valueAndFlags - (1 << FLAG_BIT_COUNT); // Negative overflow checked outside.
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,127 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.util;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
@Category({MiscTests.class, SmallTests.class})
|
|
||||||
public class TestDrainBarrier {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestDrainBarrier.class);
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBeginEndStopWork() throws Exception {
|
|
||||||
DrainBarrier barrier = new DrainBarrier();
|
|
||||||
assertTrue(barrier.beginOp());
|
|
||||||
assertTrue(barrier.beginOp());
|
|
||||||
barrier.endOp();
|
|
||||||
barrier.endOp();
|
|
||||||
barrier.stopAndDrainOps();
|
|
||||||
assertFalse(barrier.beginOp());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUnmatchedEndAssert() throws Exception {
|
|
||||||
DrainBarrier barrier = new DrainBarrier();
|
|
||||||
try {
|
|
||||||
barrier.endOp();
|
|
||||||
throw new Error("Should have asserted");
|
|
||||||
} catch (AssertionError e) {
|
|
||||||
}
|
|
||||||
|
|
||||||
barrier.beginOp();
|
|
||||||
barrier.beginOp();
|
|
||||||
barrier.endOp();
|
|
||||||
barrier.endOp();
|
|
||||||
try {
|
|
||||||
barrier.endOp();
|
|
||||||
throw new Error("Should have asserted");
|
|
||||||
} catch (AssertionError e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testStopWithoutOpsDoesntBlock() throws Exception {
|
|
||||||
DrainBarrier barrier = new DrainBarrier();
|
|
||||||
barrier.stopAndDrainOpsOnce();
|
|
||||||
|
|
||||||
barrier = new DrainBarrier();
|
|
||||||
barrier.beginOp();
|
|
||||||
barrier.endOp();
|
|
||||||
barrier.stopAndDrainOpsOnce();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
/** This test tests blocking and can have false positives in very bad timing cases. */
|
|
||||||
public void testStopIsBlockedByOps() throws Exception {
|
|
||||||
final DrainBarrier barrier = new DrainBarrier();
|
|
||||||
barrier.beginOp();
|
|
||||||
barrier.beginOp();
|
|
||||||
barrier.beginOp();
|
|
||||||
barrier.endOp();
|
|
||||||
|
|
||||||
Thread stoppingThread = new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
barrier.stopAndDrainOpsOnce();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
fail("Should not have happened");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
stoppingThread.start();
|
|
||||||
|
|
||||||
// First "end" should not unblock the thread, but the second should.
|
|
||||||
barrier.endOp();
|
|
||||||
stoppingThread.join(1000);
|
|
||||||
assertTrue(stoppingThread.isAlive());
|
|
||||||
barrier.endOp();
|
|
||||||
stoppingThread.join(30000); // When not broken, will be a very fast wait; set safe value.
|
|
||||||
assertFalse(stoppingThread.isAlive());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMultipleStopOnceAssert() throws Exception {
|
|
||||||
DrainBarrier barrier = new DrainBarrier();
|
|
||||||
barrier.stopAndDrainOpsOnce();
|
|
||||||
try {
|
|
||||||
barrier.stopAndDrainOpsOnce();
|
|
||||||
throw new Error("Should have asserted");
|
|
||||||
} catch (AssertionError e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMultipleSloppyStopsHaveNoEffect() throws Exception {
|
|
||||||
DrainBarrier barrier = new DrainBarrier();
|
|
||||||
barrier.stopAndDrainOps();
|
|
||||||
barrier.stopAndDrainOps();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1181,7 +1181,7 @@ public class HRegionServer extends HasThread implements
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
RegionServerStatusService.BlockingInterface rss = rssStub;
|
RegionServerStatusService.BlockingInterface rss = rssStub;
|
||||||
if (rss == null) {
|
if (rss == null) {
|
||||||
// the current server could be stopping.
|
// the current server could be stopping.
|
||||||
|
|
|
@ -132,6 +132,23 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void abort(String reason, Throwable cause) {
|
||||||
|
// close all WALs before calling abort on RS.
|
||||||
|
// This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
|
||||||
|
// failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
|
||||||
|
// is already broken.
|
||||||
|
for (WAL wal : walNeedsRoll.keySet()) {
|
||||||
|
// shutdown rather than close here since we are going to abort the RS and the wals need to be
|
||||||
|
// split when recovery
|
||||||
|
try {
|
||||||
|
wal.shutdown();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to shutdown wal", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
server.abort(reason, cause);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (running) {
|
while (running) {
|
||||||
|
@ -153,10 +170,8 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Time for periodic roll
|
// Time for periodic roll
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Wal roll period {} ms elapsed", this.rollperiod);
|
||||||
LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
|
} else {
|
||||||
}
|
|
||||||
} else if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("WAL roll requested");
|
LOG.debug("WAL roll requested");
|
||||||
}
|
}
|
||||||
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
|
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
|
||||||
|
@ -170,20 +185,22 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
entry.getValue().booleanValue());
|
entry.getValue().booleanValue());
|
||||||
walNeedsRoll.put(wal, Boolean.FALSE);
|
walNeedsRoll.put(wal, Boolean.FALSE);
|
||||||
if (regionsToFlush != null) {
|
if (regionsToFlush != null) {
|
||||||
for (byte [] r: regionsToFlush) scheduleFlush(r);
|
for (byte[] r : regionsToFlush) {
|
||||||
|
scheduleFlush(r);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (FailedLogCloseException e) {
|
} catch (FailedLogCloseException e) {
|
||||||
server.abort("Failed log close in log roller", e);
|
abort("Failed log close in log roller", e);
|
||||||
} catch (java.net.ConnectException e) {
|
} catch (java.net.ConnectException e) {
|
||||||
server.abort("Failed log close in log roller", e);
|
abort("Failed log close in log roller", e);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
|
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
|
||||||
server.abort("IOE in log roller",
|
abort("IOE in log roller",
|
||||||
ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
|
ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
LOG.error("Log rolling failed", ex);
|
LOG.error("Log rolling failed", ex);
|
||||||
server.abort("Log rolling failed", ex);
|
abort("Log rolling failed", ex);
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
rollLog.set(false);
|
rollLog.set(false);
|
||||||
|
@ -211,9 +228,8 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!scheduled) {
|
if (!scheduled) {
|
||||||
LOG.warn("Failed to schedule flush of " +
|
LOG.warn("Failed to schedule flush of {}, region={}, requester={}",
|
||||||
Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
|
Bytes.toString(encodedRegionName), r, requester);
|
||||||
requester);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,12 +17,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
|
||||||
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
|
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
|
||||||
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
|
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
|
|
||||||
|
|
||||||
import com.lmax.disruptor.RingBuffer;
|
import com.lmax.disruptor.RingBuffer;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
@ -46,7 +45,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.commons.lang3.mutable.MutableLong;
|
import org.apache.commons.lang3.mutable.MutableLong;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -66,7 +64,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.DrainBarrier;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -84,6 +81,7 @@ import org.apache.htrace.core.TraceScope;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -173,9 +171,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
*/
|
*/
|
||||||
protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
|
protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
|
||||||
|
|
||||||
/** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
|
|
||||||
protected final DrainBarrier closeBarrier = new DrainBarrier();
|
|
||||||
|
|
||||||
protected final long slowSyncNs;
|
protected final long slowSyncNs;
|
||||||
|
|
||||||
private final long walSyncTimeoutNs;
|
private final long walSyncTimeoutNs;
|
||||||
|
@ -452,32 +447,22 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
|
public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
|
||||||
if (!closeBarrier.beginOp()) {
|
|
||||||
LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
|
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
|
public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
|
||||||
if (!closeBarrier.beginOp()) {
|
|
||||||
LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
|
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completeCacheFlush(byte[] encodedRegionName) {
|
public void completeCacheFlush(byte[] encodedRegionName) {
|
||||||
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
|
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
|
||||||
closeBarrier.endOp();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void abortCacheFlush(byte[] encodedRegionName) {
|
public void abortCacheFlush(byte[] encodedRegionName) {
|
||||||
this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
|
this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
|
||||||
closeBarrier.endOp();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -715,7 +700,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
// Now we have published the ringbuffer, halt the current thread until we get an answer back.
|
// Now we have published the ringbuffer, halt the current thread until we get an answer back.
|
||||||
try {
|
try {
|
||||||
if (syncFuture != null) {
|
if (syncFuture != null) {
|
||||||
syncFuture.get(walSyncTimeoutNs);
|
if (closed) {
|
||||||
|
throw new IOException("WAL has been closed");
|
||||||
|
} else {
|
||||||
|
syncFuture.get(walSyncTimeoutNs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (TimeoutIOException tioe) {
|
} catch (TimeoutIOException tioe) {
|
||||||
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
|
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
|
||||||
|
@ -755,10 +744,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
LOG.debug("WAL closed. Skipping rolling of writer");
|
LOG.debug("WAL closed. Skipping rolling of writer");
|
||||||
return regionsToFlush;
|
return regionsToFlush;
|
||||||
}
|
}
|
||||||
if (!closeBarrier.beginOp()) {
|
|
||||||
LOG.debug("WAL closing. Skipping rolling of writer");
|
|
||||||
return regionsToFlush;
|
|
||||||
}
|
|
||||||
try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
|
try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) {
|
||||||
Path oldPath = getOldPath();
|
Path oldPath = getOldPath();
|
||||||
Path newPath = getNewPath();
|
Path newPath = getNewPath();
|
||||||
|
@ -783,8 +768,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
|
"Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
|
||||||
exception);
|
exception);
|
||||||
} finally {
|
|
||||||
closeBarrier.endOp();
|
|
||||||
}
|
}
|
||||||
return regionsToFlush;
|
return regionsToFlush;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -818,20 +801,18 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
closed = true;
|
closed = true;
|
||||||
try {
|
|
||||||
// Prevent all further flushing and rolling.
|
|
||||||
closeBarrier.stopAndDrainOps();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.error("Exception while waiting for cache flushes and log rolls", e);
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
// Tell our listeners that the log is closing
|
// Tell our listeners that the log is closing
|
||||||
if (!this.listeners.isEmpty()) {
|
if (!this.listeners.isEmpty()) {
|
||||||
for (WALActionsListener i : this.listeners) {
|
for (WALActionsListener i : this.listeners) {
|
||||||
i.logCloseRequested();
|
i.logCloseRequested();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
doShutdown();
|
rollWriterLock.lock();
|
||||||
|
try {
|
||||||
|
doShutdown();
|
||||||
|
} finally {
|
||||||
|
rollWriterLock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -608,19 +608,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected AsyncWriter createWriterInstance(Path path) throws IOException {
|
protected AsyncWriter createWriterInstance(Path path) throws IOException {
|
||||||
try {
|
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup,
|
||||||
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup,
|
channelClass);
|
||||||
channelClass);
|
|
||||||
} catch (IOException e) {
|
|
||||||
// this usually means master already think we are dead so let's fail all the pending
|
|
||||||
// syncs. The shutdown process of RS will wait for all regions to be closed before calling
|
|
||||||
// WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
|
|
||||||
// lock.
|
|
||||||
if (e.getMessage().contains("Parent directory doesn't exist:")) {
|
|
||||||
syncFutures.forEach(f -> f.done(f.getTxid(), e));
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForSafePoint() {
|
private void waitForSafePoint() {
|
||||||
|
@ -675,17 +664,34 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
closeExecutor.shutdown();
|
closeExecutor.shutdown();
|
||||||
try {
|
try {
|
||||||
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
|
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
|
||||||
LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
|
LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" +
|
||||||
+ " the close of async writer doesn't complete."
|
" the close of async writer doesn't complete." +
|
||||||
+ "Please check the status of underlying filesystem"
|
"Please check the status of underlying filesystem" +
|
||||||
+ " or increase the wait time by the config \""
|
" or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS +
|
||||||
+ ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + "\"");
|
"\"");
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.error("The wait for close of async writer is interrupted");
|
LOG.error("The wait for close of async writer is interrupted");
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
IOException error = new IOException("WAL has been closed");
|
IOException error = new IOException("WAL has been closed");
|
||||||
|
long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
|
||||||
|
// drain all the pending sync requests
|
||||||
|
for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound;
|
||||||
|
nextCursor++) {
|
||||||
|
if (!waitingConsumePayloads.isPublished(nextCursor)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
|
||||||
|
switch (truck.type()) {
|
||||||
|
case SYNC:
|
||||||
|
syncFutures.add(truck.unloadSync());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// and fail them
|
||||||
syncFutures.forEach(f -> f.done(f.getTxid(), error));
|
syncFutures.forEach(f -> f.done(f.getTxid(), error));
|
||||||
if (!(consumeExecutor instanceof EventLoop)) {
|
if (!(consumeExecutor instanceof EventLoop)) {
|
||||||
consumeExecutor.shutdown();
|
consumeExecutor.shutdown();
|
||||||
|
|
|
@ -0,0 +1,164 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||||
|
import org.apache.hadoop.hbase.YouAreDeadException;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
|
import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameter;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See HBASE-19929 for more details.
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
|
public class TestShutdownWhileWALBroken {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestShutdownWhileWALBroken.class);
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestShutdownWhileWALBroken.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static TableName TABLE_NAME = TableName.valueOf("TestShutdownWhileWALBroken");
|
||||||
|
|
||||||
|
private static byte[] CF = Bytes.toBytes("CF");
|
||||||
|
|
||||||
|
@Parameter
|
||||||
|
public String walType;
|
||||||
|
|
||||||
|
@Parameters(name = "{index}: WAL={0}")
|
||||||
|
public static List<Object[]> params() {
|
||||||
|
return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class MyRegionServer extends HRegionServer {
|
||||||
|
|
||||||
|
private final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public MyRegionServer(Configuration conf) throws IOException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
super.tryRegionServerReport(reportStartTime, reportEndTime);
|
||||||
|
} catch (YouAreDeadException e) {
|
||||||
|
LOG.info("Caught YouAreDeadException, ignore", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort(String reason, Throwable cause) {
|
||||||
|
if (cause instanceof SessionExpiredException) {
|
||||||
|
// called from ZKWatcher, let's wait a bit to make sure that we call stop before calling
|
||||||
|
// abort.
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// abort from other classes, usually LogRoller, now we can make progress on abort.
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
super.abort(reason, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, MyRegionServer.class,
|
||||||
|
HRegionServer.class);
|
||||||
|
UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walType);
|
||||||
|
UTIL.getConfiguration().set(WALFactory.META_WAL_PROVIDER, walType);
|
||||||
|
UTIL.startMiniCluster(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
UTIL.createMultiRegionTable(TABLE_NAME, CF);
|
||||||
|
try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
|
||||||
|
UTIL.loadTable(table, CF);
|
||||||
|
}
|
||||||
|
int numRegions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size();
|
||||||
|
RegionServerThread rst0 = UTIL.getMiniHBaseCluster().getRegionServerThreads().get(0);
|
||||||
|
RegionServerThread rst1 = UTIL.getMiniHBaseCluster().getRegionServerThreads().get(1);
|
||||||
|
HRegionServer liveRS;
|
||||||
|
RegionServerThread toKillRSThread;
|
||||||
|
if (rst1.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty()) {
|
||||||
|
liveRS = rst0.getRegionServer();
|
||||||
|
toKillRSThread = rst1;
|
||||||
|
} else {
|
||||||
|
liveRS = rst1.getRegionServer();
|
||||||
|
toKillRSThread = rst0;
|
||||||
|
}
|
||||||
|
assertTrue(liveRS.getRegions(TABLE_NAME).size() < numRegions);
|
||||||
|
UTIL.expireSession(toKillRSThread.getRegionServer().getZooKeeper(), false);
|
||||||
|
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return liveRS.getRegions(TABLE_NAME).size() == numRegions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainFailure() throws Exception {
|
||||||
|
return "Failover is not finished yet";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
toKillRSThread.getRegionServer().stop("Stop for test");
|
||||||
|
// make sure that we can successfully quit
|
||||||
|
toKillRSThread.join();
|
||||||
|
}
|
||||||
|
}
|
|
@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -43,9 +42,9 @@ import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.DamagedWALException;
|
import org.apache.hadoop.hbase.regionserver.wal.DamagedWALException;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
@ -67,11 +66,12 @@ import org.mockito.Mockito;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testing for lock up of WAL subsystem.
|
* Testing for lock up of FSHLog.
|
||||||
* Copied from TestHRegion.
|
|
||||||
*/
|
*/
|
||||||
@Category({MediumTests.class})
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
public class TestWALLockup {
|
public class TestWALLockup {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
|
@ -79,14 +79,15 @@ public class TestWALLockup {
|
||||||
HBaseClassTestRule.forClass(TestWALLockup.class);
|
HBaseClassTestRule.forClass(TestWALLockup.class);
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestWALLockup.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestWALLockup.class);
|
||||||
@Rule public TestName name = new TestName();
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
private static final String COLUMN_FAMILY = "MyCF";
|
private static final String COLUMN_FAMILY = "MyCF";
|
||||||
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
|
private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
|
||||||
|
|
||||||
HRegion region = null;
|
HRegion region = null;
|
||||||
// Do not run unit tests in parallel (? Why not? It don't work? Why not? St.Ack)
|
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
private static HBaseTestingUtility TEST_UTIL;
|
|
||||||
private static Configuration CONF ;
|
private static Configuration CONF ;
|
||||||
private String dir;
|
private String dir;
|
||||||
|
|
||||||
|
@ -95,7 +96,6 @@ public class TestWALLockup {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
|
|
||||||
CONF = TEST_UTIL.getConfiguration();
|
CONF = TEST_UTIL.getConfiguration();
|
||||||
// Disable block cache.
|
// Disable block cache.
|
||||||
CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
|
CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
|
||||||
|
@ -110,10 +110,90 @@ public class TestWALLockup {
|
||||||
TEST_UTIL.cleanupTestDir();
|
TEST_UTIL.cleanupTestDir();
|
||||||
}
|
}
|
||||||
|
|
||||||
String getName() {
|
private String getName() {
|
||||||
return name.getMethodName();
|
return name.getMethodName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A WAL that we can have throw exceptions when a flag is set.
|
||||||
|
private static final class DodgyFSLog extends FSHLog {
|
||||||
|
// Set this when want the WAL to start throwing exceptions.
|
||||||
|
volatile boolean throwException = false;
|
||||||
|
|
||||||
|
// Latch to hold up processing until after another operation has had time to run.
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
super(fs, root, logDir, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void afterCreatingZigZagLatch() {
|
||||||
|
// If throwException set, then append will throw an exception causing the WAL to be
|
||||||
|
// rolled. We'll come in here. Hold up processing until a sync can get in before
|
||||||
|
// the zigzag has time to complete its setup and get its own sync in. This is what causes
|
||||||
|
// the lock up we've seen in production.
|
||||||
|
if (throwException) {
|
||||||
|
try {
|
||||||
|
LOG.info("LATCHED");
|
||||||
|
// So, timing can have it that the test can run and the bad flush below happens
|
||||||
|
// before we get here. In this case, we'll be stuck waiting on this latch but there
|
||||||
|
// is nothing in the WAL pipeline to get us to the below beforeWaitOnSafePoint...
|
||||||
|
// because all WALs have rolled. In this case, just give up on test.
|
||||||
|
if (!this.latch.await(5, TimeUnit.SECONDS)) {
|
||||||
|
LOG.warn("GIVE UP! Failed waiting on latch...Test is ABORTED!");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void beforeWaitOnSafePoint() {
|
||||||
|
if (throwException) {
|
||||||
|
LOG.info("COUNTDOWN");
|
||||||
|
// Don't countdown latch until someone waiting on it otherwise, the above
|
||||||
|
// afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll
|
||||||
|
// be stuck; test won't go down
|
||||||
|
while (this.latch.getCount() <= 0)
|
||||||
|
Threads.sleep(1);
|
||||||
|
this.latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Writer createWriterInstance(Path path) throws IOException {
|
||||||
|
final Writer w = super.createWriterInstance(path);
|
||||||
|
return new Writer() {
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
w.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync() throws IOException {
|
||||||
|
if (throwException) {
|
||||||
|
throw new IOException("FAKE! Failed to replace a bad datanode...SYNC");
|
||||||
|
}
|
||||||
|
w.sync();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void append(Entry entry) throws IOException {
|
||||||
|
if (throwException) {
|
||||||
|
throw new IOException("FAKE! Failed to replace a bad datanode...APPEND");
|
||||||
|
}
|
||||||
|
w.append(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLength() {
|
||||||
|
return w.getLength();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reproduce locking up that happens when we get an inopportune sync during setup for
|
* Reproduce locking up that happens when we get an inopportune sync during setup for
|
||||||
* zigzaglatch wait. See HBASE-14317. If below is broken, we will see this test timeout because
|
* zigzaglatch wait. See HBASE-14317. If below is broken, we will see this test timeout because
|
||||||
|
@ -121,89 +201,8 @@ public class TestWALLockup {
|
||||||
* <p>First I need to set up some mocks for Server and RegionServerServices. I also need to
|
* <p>First I need to set up some mocks for Server and RegionServerServices. I also need to
|
||||||
* set up a dodgy WAL that will throw an exception when we go to append to it.
|
* set up a dodgy WAL that will throw an exception when we go to append to it.
|
||||||
*/
|
*/
|
||||||
@Test (timeout=20000)
|
@Test
|
||||||
public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException {
|
public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException {
|
||||||
// A WAL that we can have throw exceptions when a flag is set.
|
|
||||||
class DodgyFSLog extends FSHLog {
|
|
||||||
// Set this when want the WAL to start throwing exceptions.
|
|
||||||
volatile boolean throwException = false;
|
|
||||||
|
|
||||||
// Latch to hold up processing until after another operation has had time to run.
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
|
|
||||||
public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf)
|
|
||||||
throws IOException {
|
|
||||||
super(fs, root, logDir, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void afterCreatingZigZagLatch() {
|
|
||||||
// If throwException set, then append will throw an exception causing the WAL to be
|
|
||||||
// rolled. We'll come in here. Hold up processing until a sync can get in before
|
|
||||||
// the zigzag has time to complete its setup and get its own sync in. This is what causes
|
|
||||||
// the lock up we've seen in production.
|
|
||||||
if (throwException) {
|
|
||||||
try {
|
|
||||||
LOG.info("LATCHED");
|
|
||||||
// So, timing can have it that the test can run and the bad flush below happens
|
|
||||||
// before we get here. In this case, we'll be stuck waiting on this latch but there
|
|
||||||
// is nothing in the WAL pipeline to get us to the below beforeWaitOnSafePoint...
|
|
||||||
// because all WALs have rolled. In this case, just give up on test.
|
|
||||||
if (!this.latch.await(5, TimeUnit.SECONDS)) {
|
|
||||||
LOG.warn("GIVE UP! Failed waiting on latch...Test is ABORTED!");
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// TODO Auto-generated catch block
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void beforeWaitOnSafePoint() {
|
|
||||||
if (throwException) {
|
|
||||||
LOG.info("COUNTDOWN");
|
|
||||||
// Don't countdown latch until someone waiting on it otherwise, the above
|
|
||||||
// afterCreatingZigZagLatch will get to the latch and no one will ever free it and we'll
|
|
||||||
// be stuck; test won't go down
|
|
||||||
while (this.latch.getCount() <= 0) Threads.sleep(1);
|
|
||||||
this.latch.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Writer createWriterInstance(Path path) throws IOException {
|
|
||||||
final Writer w = super.createWriterInstance(path);
|
|
||||||
return new Writer() {
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
w.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sync() throws IOException {
|
|
||||||
if (throwException) {
|
|
||||||
throw new IOException("FAKE! Failed to replace a bad datanode...SYNC");
|
|
||||||
}
|
|
||||||
w.sync();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void append(Entry entry) throws IOException {
|
|
||||||
if (throwException) {
|
|
||||||
throw new IOException("FAKE! Failed to replace a bad datanode...APPEND");
|
|
||||||
}
|
|
||||||
w.append(entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getLength() {
|
|
||||||
return w.getLength();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mocked up server and regionserver services. Needed below.
|
// Mocked up server and regionserver services. Needed below.
|
||||||
Server server = Mockito.mock(Server.class);
|
Server server = Mockito.mock(Server.class);
|
||||||
Mockito.when(server.getConfiguration()).thenReturn(CONF);
|
Mockito.when(server.getConfiguration()).thenReturn(CONF);
|
||||||
|
@ -222,7 +221,6 @@ public class TestWALLockup {
|
||||||
// There is no 'stop' once a logRoller is running.. it just dies.
|
// There is no 'stop' once a logRoller is running.. it just dies.
|
||||||
logRoller.start();
|
logRoller.start();
|
||||||
// Now get a region and start adding in edits.
|
// Now get a region and start adding in edits.
|
||||||
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
|
|
||||||
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
|
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
|
||||||
byte [] bytes = Bytes.toBytes(getName());
|
byte [] bytes = Bytes.toBytes(getName());
|
||||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(
|
NavigableMap<byte[], Integer> scopes = new TreeMap<>(
|
||||||
|
@ -236,7 +234,7 @@ public class TestWALLockup {
|
||||||
Put put = new Put(bytes);
|
Put put = new Put(bytes);
|
||||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||||
WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
|
WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
|
||||||
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes);
|
TableName.META_TABLE_NAME, System.currentTimeMillis(), mvcc, scopes);
|
||||||
WALEdit edit = new WALEdit();
|
WALEdit edit = new WALEdit();
|
||||||
CellScanner CellScanner = put.cellScanner();
|
CellScanner CellScanner = put.cellScanner();
|
||||||
assertTrue(CellScanner.advance());
|
assertTrue(CellScanner.advance());
|
||||||
|
@ -281,7 +279,9 @@ public class TestWALLockup {
|
||||||
t.setDaemon(true);
|
t.setDaemon(true);
|
||||||
t.start();
|
t.start();
|
||||||
// Wait until
|
// Wait until
|
||||||
while (dodgyWAL.latch.getCount() > 0) Threads.sleep(1);
|
while (dodgyWAL.latch.getCount() > 0) {
|
||||||
|
Threads.sleep(1);
|
||||||
|
}
|
||||||
// Now assert I got a new WAL file put in place even though loads of errors above.
|
// Now assert I got a new WAL file put in place even though loads of errors above.
|
||||||
assertTrue(originalWAL != dodgyWAL.getCurrentFileName());
|
assertTrue(originalWAL != dodgyWAL.getCurrentFileName());
|
||||||
// Can I append to it?
|
// Can I append to it?
|
||||||
|
@ -294,203 +294,13 @@ public class TestWALLockup {
|
||||||
} finally {
|
} finally {
|
||||||
// To stop logRoller, its server has to say it is stopped.
|
// To stop logRoller, its server has to say it is stopped.
|
||||||
Mockito.when(server.isStopped()).thenReturn(true);
|
Mockito.when(server.isStopped()).thenReturn(true);
|
||||||
if (logRoller != null) logRoller.close();
|
Closeables.close(logRoller, true);
|
||||||
try {
|
|
||||||
if (region != null) region.close();
|
|
||||||
if (dodgyWAL != null) dodgyWAL.close();
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.info("On way out", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reproduce locking up that happens when there's no further syncs after
|
|
||||||
* append fails, and causing an isolated sync then infinite wait. See
|
|
||||||
* HBASE-16960. If below is broken, we will see this test timeout because it
|
|
||||||
* is locked up.
|
|
||||||
* <p/>
|
|
||||||
* Steps for reproduce:<br/>
|
|
||||||
* 1. Trigger server abort through dodgyWAL1<br/>
|
|
||||||
* 2. Add a {@link DummyWALActionsListener} to dodgyWAL2 to cause ringbuffer
|
|
||||||
* event handler thread sleep for a while thus keeping {@code endOfBatch}
|
|
||||||
* false<br/>
|
|
||||||
* 3. Publish a sync then an append which will throw exception, check whether
|
|
||||||
* the sync could return
|
|
||||||
*/
|
|
||||||
@Test(timeout = 20000)
|
|
||||||
public void testLockup16960() throws IOException {
|
|
||||||
// A WAL that we can have throw exceptions when a flag is set.
|
|
||||||
class DodgyFSLog extends FSHLog {
|
|
||||||
// Set this when want the WAL to start throwing exceptions.
|
|
||||||
volatile boolean throwException = false;
|
|
||||||
|
|
||||||
public DodgyFSLog(FileSystem fs, Path root, String logDir,
|
|
||||||
Configuration conf) throws IOException {
|
|
||||||
super(fs, root, logDir, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Writer createWriterInstance(Path path) throws IOException {
|
|
||||||
final Writer w = super.createWriterInstance(path);
|
|
||||||
return new Writer() {
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
w.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void sync() throws IOException {
|
|
||||||
if (throwException) {
|
|
||||||
throw new IOException(
|
|
||||||
"FAKE! Failed to replace a bad datanode...SYNC");
|
|
||||||
}
|
|
||||||
w.sync();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void append(Entry entry) throws IOException {
|
|
||||||
if (throwException) {
|
|
||||||
throw new IOException(
|
|
||||||
"FAKE! Failed to replace a bad datanode...APPEND");
|
|
||||||
}
|
|
||||||
w.append(entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getLength() {
|
|
||||||
return w.getLength();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected long doReplaceWriter(Path oldPath, Path newPath,
|
|
||||||
Writer nextWriter) throws IOException {
|
|
||||||
if (throwException) {
|
|
||||||
throw new FailedLogCloseException("oldPath=" + oldPath + ", newPath="
|
|
||||||
+ newPath);
|
|
||||||
}
|
|
||||||
long oldFileLen = 0L;
|
|
||||||
oldFileLen = super.doReplaceWriter(oldPath, newPath, nextWriter);
|
|
||||||
return oldFileLen;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mocked up server and regionserver services. Needed below.
|
|
||||||
Server server = new DummyServer(CONF, ServerName.valueOf(
|
|
||||||
"hostname1.example.org", 1234, 1L).toString());
|
|
||||||
RegionServerServices services = Mockito.mock(RegionServerServices.class);
|
|
||||||
|
|
||||||
CONF.setLong("hbase.regionserver.hlog.sync.timeout", 10000);
|
|
||||||
|
|
||||||
// OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL,
|
|
||||||
// go ahead with test.
|
|
||||||
FileSystem fs = FileSystem.get(CONF);
|
|
||||||
Path rootDir = new Path(dir + getName());
|
|
||||||
DodgyFSLog dodgyWAL1 = new DodgyFSLog(fs, rootDir, getName(), CONF);
|
|
||||||
|
|
||||||
Path rootDir2 = new Path(dir + getName() + "2");
|
|
||||||
final DodgyFSLog dodgyWAL2 = new DodgyFSLog(fs, rootDir2, getName() + "2",
|
|
||||||
CONF);
|
|
||||||
// Add a listener to force ringbuffer event handler sleep for a while
|
|
||||||
dodgyWAL2.registerWALActionsListener(new DummyWALActionsListener());
|
|
||||||
|
|
||||||
// I need a log roller running.
|
|
||||||
LogRoller logRoller = new LogRoller(server, services);
|
|
||||||
logRoller.addWAL(dodgyWAL1);
|
|
||||||
logRoller.addWAL(dodgyWAL2);
|
|
||||||
// There is no 'stop' once a logRoller is running.. it just dies.
|
|
||||||
logRoller.start();
|
|
||||||
// Now get a region and start adding in edits.
|
|
||||||
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
|
|
||||||
final HRegion region = initHRegion(tableName, null, null, dodgyWAL1);
|
|
||||||
byte[] bytes = Bytes.toBytes(getName());
|
|
||||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(
|
|
||||||
Bytes.BYTES_COMPARATOR);
|
|
||||||
scopes.put(COLUMN_FAMILY_BYTES, 0);
|
|
||||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
|
||||||
try {
|
|
||||||
Put put = new Put(bytes);
|
|
||||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
|
||||||
WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
|
|
||||||
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes);
|
|
||||||
WALEdit edit = new WALEdit();
|
|
||||||
CellScanner CellScanner = put.cellScanner();
|
|
||||||
assertTrue(CellScanner.advance());
|
|
||||||
edit.add(CellScanner.current());
|
|
||||||
|
|
||||||
LOG.info("SET throwing of exception on append");
|
|
||||||
dodgyWAL1.throwException = true;
|
|
||||||
// This append provokes a WAL roll request
|
|
||||||
dodgyWAL1.append(region.getRegionInfo(), key, edit, true);
|
|
||||||
boolean exception = false;
|
|
||||||
try {
|
|
||||||
dodgyWAL1.sync();
|
|
||||||
} catch (Exception e) {
|
|
||||||
exception = true;
|
|
||||||
}
|
|
||||||
assertTrue("Did not get sync exception", exception);
|
|
||||||
|
|
||||||
// LogRoller call dodgyWAL1.rollWriter get FailedLogCloseException and
|
|
||||||
// cause server abort.
|
|
||||||
try {
|
|
||||||
// wait LogRoller exit.
|
|
||||||
Thread.sleep(50);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
|
|
||||||
// make RingBufferEventHandler sleep 1s, so the following sync
|
|
||||||
// endOfBatch=false
|
|
||||||
key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
|
|
||||||
TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc, scopes);
|
|
||||||
dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
|
|
||||||
|
|
||||||
Thread t = new Thread("Sync") {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
dodgyWAL2.sync();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.info("In sync", e);
|
|
||||||
}
|
|
||||||
latch.countDown();
|
|
||||||
LOG.info("Sync exiting");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
t.setDaemon(true);
|
|
||||||
t.start();
|
|
||||||
try {
|
|
||||||
// make sure sync have published.
|
|
||||||
Thread.sleep(100);
|
|
||||||
} catch (InterruptedException e1) {
|
|
||||||
e1.printStackTrace();
|
|
||||||
}
|
|
||||||
// make append throw DamagedWALException
|
|
||||||
key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
|
|
||||||
TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc, scopes);
|
|
||||||
dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
|
|
||||||
|
|
||||||
while (latch.getCount() > 0) {
|
|
||||||
Threads.sleep(100);
|
|
||||||
}
|
|
||||||
assertTrue(server.isAborted());
|
|
||||||
} finally {
|
|
||||||
if (logRoller != null) {
|
|
||||||
logRoller.close();
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
if (region != null) {
|
if (region != null) {
|
||||||
region.close();
|
region.close();
|
||||||
}
|
}
|
||||||
if (dodgyWAL1 != null) {
|
if (dodgyWAL != null) {
|
||||||
dodgyWAL1.close();
|
dodgyWAL.close();
|
||||||
}
|
|
||||||
if (dodgyWAL2 != null) {
|
|
||||||
dodgyWAL2.close();
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info("On way out", e);
|
LOG.info("On way out", e);
|
||||||
|
@ -606,11 +416,11 @@ public class TestWALLockup {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return A region on which you must call
|
* @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
|
||||||
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
|
* when done.
|
||||||
*/
|
*/
|
||||||
public static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
|
private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey, WAL wal)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
||||||
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
|
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, false, Durability.SYNC_WAL,
|
||||||
wal, COLUMN_FAMILY_BYTES);
|
wal, COLUMN_FAMILY_BYTES);
|
||||||
|
|
Loading…
Reference in New Issue