HDFS-13051. Fix dead lock during async editlog rolling if edit queue is full. Contributed by Daryn Sharp.

(cherry picked from commit 8e54da1511)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
This commit is contained in:
Xiao Chen 2018-09-10 22:14:02 -07:00
parent 77dd456461
commit 2dd27c999b
2 changed files with 215 additions and 4 deletions

View File

@ -24,7 +24,9 @@ import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -145,15 +147,68 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
edit.logSyncWait(); edit.logSyncWait();
} }
// draining permits is intended to provide a high priority reservation.
// however, release of outstanding permits must be postponed until
// drained permits are restored to avoid starvation. logic has some races
// but is good enough to serve its purpose.
private Semaphore overflowMutex = new Semaphore(8){
private AtomicBoolean draining = new AtomicBoolean();
private AtomicInteger pendingReleases = new AtomicInteger();
@Override
public int drainPermits() {
draining.set(true);
return super.drainPermits();
}
// while draining, count the releases until release(int)
private void tryRelease(int permits) {
pendingReleases.getAndAdd(permits);
if (!draining.get()) {
super.release(pendingReleases.getAndSet(0));
}
}
@Override
public void release() {
tryRelease(1);
}
@Override
public void release(int permits) {
draining.set(false);
tryRelease(permits);
}
};
private void enqueueEdit(Edit edit) { private void enqueueEdit(Edit edit) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("logEdit " + edit); LOG.debug("logEdit " + edit);
} }
try { try {
if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) { // not checking for overflow yet to avoid penalizing performance of
// the common case. if there is persistent overflow, a mutex will be
// use to throttle contention on the queue.
if (!editPendingQ.offer(edit)) {
Preconditions.checkState( Preconditions.checkState(
isSyncThreadAlive(), "sync thread is not alive"); isSyncThreadAlive(), "sync thread is not alive");
editPendingQ.put(edit); if (Thread.holdsLock(this)) {
// if queue is full, synchronized caller must immediately relinquish
// the monitor before re-offering to avoid deadlock with sync thread
// which needs the monitor to write transactions.
int permits = overflowMutex.drainPermits();
try {
do {
this.wait(1000); // will be notified by next logSync.
} while (!editPendingQ.offer(edit));
} finally {
overflowMutex.release(permits);
}
} else {
// mutex will throttle contention during persistent overflow.
overflowMutex.acquire();
try {
editPendingQ.put(edit);
} finally {
overflowMutex.release();
}
}
} }
} catch (Throwable t) { } catch (Throwable t) {
// should never happen! failure to enqueue an edit is fatal // should never happen! failure to enqueue an edit is fatal

View File

@ -28,12 +28,19 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -52,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.mockito.Mockito;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -565,5 +573,153 @@ public class TestEditLogRace {
LOG.info("Closing nn"); LOG.info("Closing nn");
if(namesystem != null) namesystem.close(); if(namesystem != null) namesystem.close();
} }
} }
@Test(timeout=180000)
public void testDeadlock() throws Throwable {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
DFSTestUtil.formatNameNode(conf);
final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf);
final AtomicBoolean done = new AtomicBoolean(false);
final Semaphore blockerSemaphore = new Semaphore(0);
final CountDownLatch startSpamLatch = new CountDownLatch(1);
ExecutorService executor = Executors.newCachedThreadPool();
try {
final FSEditLog editLog = namesystem.getEditLog();
FSEditLogOp.OpInstanceCache cache = editLog.cache.get();
final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
.setSource("/").setUser("u").setGroup("g");
// don't reset fields so instance can be reused.
final FSEditLogOp reuseOp = Mockito.spy(op);
Mockito.doNothing().when(reuseOp).reset();
// only job is spam edits. it will fill the queue when the test
// loop injects the blockingOp.
Future[] logSpammers = new Future[16];
for (int i=0; i < logSpammers.length; i++) {
final int ii = i;
logSpammers[i] = executor.submit(new Callable() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log spammer " + ii);
// wait until a blocking edit op notifies us to go.
startSpamLatch.await();
for (int i = 0; !done.get() && i < 1000000; i++) {
// do not logSync here because we need to congest the queue.
editLog.logEdit(reuseOp);
if (i % 2048 == 0) {
LOG.info("thread[" + ii +"] edits=" + i);
}
}
assertTrue("too many edits", done.get());
return null;
}
});
}
// the tx id is set while the edit log monitor is held, so this will
// effectively stall the async processing thread which will cause the
// edit queue to fill up.
final FSEditLogOp blockingOp = Mockito.spy(op);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
// flip the latch to unleash the spamming threads to congest
// the queue.
startSpamLatch.countDown();
// wait until unblocked after a synchronized thread is started.
blockerSemaphore.acquire();
invocation.callRealMethod();
return null;
}
}
).when(blockingOp).setTransactionId(Mockito.anyLong());
// don't reset fields so instance can be reused.
Mockito.doNothing().when(blockingOp).reset();
// repeatedly overflow the queue and verify it doesn't deadlock.
for (int i = 0; i < 8; i++) {
// when the blockingOp is logged, it triggers the latch to unleash the
// spammers to overflow the edit queue, then waits for a permit
// from blockerSemaphore that will be released at the bottom of
// this loop.
Future blockingEdit = executor.submit(new Callable() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log blocker");
editLog.logEdit(blockingOp);
editLog.logSync();
return null;
}
});
// wait for spammers to seize up the edit log.
long startTxId = editLog.getLastWrittenTxIdWithoutLock();
final long[] txIds = { startTxId, startTxId, startTxId };
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
txIds[0] = txIds[1];
txIds[1] = txIds[2];
txIds[2] = editLog.getLastWrittenTxIdWithoutLock();
return (txIds[0] == txIds[1] &&
txIds[1] == txIds[2] &&
txIds[2] > startTxId);
}
}, 100, 10000);
// callers that synchronize on the edit log while the queue is full
// are prone to deadlock if the locking is incorrect. at this point:
// 1. the blocking edit is holding the log's monitor.
// 2. the spammers have filled the queue.
// 3. the spammers are blocked waiting to queue another edit.
// Now we'll start another thread to synchronize on the log (simulates
// what log rolling does), unblock the op currently holding the
// monitor, and ensure deadlock does not occur.
CountDownLatch readyLatch = new CountDownLatch(1);
Future synchedEdits = executor.submit(new Callable() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log synchronizer");
// the sync is CRUCIAL for this test. it's what causes edit
// log rolling to deadlock when queue is full.
readyLatch.countDown();
synchronized (editLog) {
editLog.logEdit(reuseOp);
editLog.logSync();
}
return null;
}
});
// unblock the edit jammed in setting its txid. queued edits should
// start flowing and the synced edits should complete.
readyLatch.await();
blockerSemaphore.release();
blockingEdit.get();
synchedEdits.get();
}
// tell spammers to stop.
done.set(true);
for (int i=0; i < logSpammers.length; i++) {
logSpammers[i].get();
}
// just make sure everything can be synced.
editLog.logSyncAll();
} finally {
LOG.info("Closing nn");
executor.shutdownNow();
if (namesystem != null) {
namesystem.getFSImage().getStorage().close();
namesystem.close();
}
}
}
} }