diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java index 5990c2273fd..1604872ed97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java @@ -24,7 +24,9 @@ import java.util.Deque; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,15 +147,68 @@ class FSEditLogAsync extends FSEditLog implements Runnable { edit.logSyncWait(); } + // draining permits is intended to provide a high priority reservation. + // however, release of outstanding permits must be postponed until + // drained permits are restored to avoid starvation. logic has some races + // but is good enough to serve its purpose. + private Semaphore overflowMutex = new Semaphore(8){ + private AtomicBoolean draining = new AtomicBoolean(); + private AtomicInteger pendingReleases = new AtomicInteger(); + @Override + public int drainPermits() { + draining.set(true); + return super.drainPermits(); + } + // while draining, count the releases until release(int) + private void tryRelease(int permits) { + pendingReleases.getAndAdd(permits); + if (!draining.get()) { + super.release(pendingReleases.getAndSet(0)); + } + } + @Override + public void release() { + tryRelease(1); + } + @Override + public void release(int permits) { + draining.set(false); + tryRelease(permits); + } + }; + private void enqueueEdit(Edit edit) { if (LOG.isDebugEnabled()) { LOG.debug("logEdit " + edit); } try { - if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) { + // not checking for overflow yet to avoid penalizing performance of + // the common case. if there is persistent overflow, a mutex will be + // use to throttle contention on the queue. + if (!editPendingQ.offer(edit)) { Preconditions.checkState( isSyncThreadAlive(), "sync thread is not alive"); - editPendingQ.put(edit); + if (Thread.holdsLock(this)) { + // if queue is full, synchronized caller must immediately relinquish + // the monitor before re-offering to avoid deadlock with sync thread + // which needs the monitor to write transactions. + int permits = overflowMutex.drainPermits(); + try { + do { + this.wait(1000); // will be notified by next logSync. + } while (!editPendingQ.offer(edit)); + } finally { + overflowMutex.release(permits); + } + } else { + // mutex will throttle contention during persistent overflow. + overflowMutex.acquire(); + try { + editPendingQ.put(edit); + } finally { + overflowMutex.release(); + } + } } } catch (Throwable t) { // should never happen! failure to enqueue an edit is fatal diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java index 10f571c4d09..8f26fe3322b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java @@ -28,12 +28,19 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; +import org.mockito.Mockito; import org.slf4j.event.Level; import org.junit.Test; import org.junit.runner.RunWith; @@ -565,5 +573,153 @@ public class TestEditLogRace { LOG.info("Closing nn"); if(namesystem != null) namesystem.close(); } - } + } + + @Test(timeout=180000) + public void testDeadlock() throws Throwable { + GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO); + + Configuration conf = getConf(); + NameNode.initMetrics(conf, NamenodeRole.NAMENODE); + DFSTestUtil.formatNameNode(conf); + final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf); + + final AtomicBoolean done = new AtomicBoolean(false); + final Semaphore blockerSemaphore = new Semaphore(0); + final CountDownLatch startSpamLatch = new CountDownLatch(1); + + ExecutorService executor = Executors.newCachedThreadPool(); + try { + final FSEditLog editLog = namesystem.getEditLog(); + + FSEditLogOp.OpInstanceCache cache = editLog.cache.get(); + final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache) + .setSource("/").setUser("u").setGroup("g"); + // don't reset fields so instance can be reused. + final FSEditLogOp reuseOp = Mockito.spy(op); + Mockito.doNothing().when(reuseOp).reset(); + + // only job is spam edits. it will fill the queue when the test + // loop injects the blockingOp. + Future[] logSpammers = new Future[16]; + for (int i=0; i < logSpammers.length; i++) { + final int ii = i; + logSpammers[i] = executor.submit(new Callable() { + @Override + public Void call() throws Exception { + Thread.currentThread().setName("Log spammer " + ii); + // wait until a blocking edit op notifies us to go. + startSpamLatch.await(); + for (int i = 0; !done.get() && i < 1000000; i++) { + // do not logSync here because we need to congest the queue. + editLog.logEdit(reuseOp); + if (i % 2048 == 0) { + LOG.info("thread[" + ii +"] edits=" + i); + } + } + assertTrue("too many edits", done.get()); + return null; + } + }); + } + + // the tx id is set while the edit log monitor is held, so this will + // effectively stall the async processing thread which will cause the + // edit queue to fill up. + final FSEditLogOp blockingOp = Mockito.spy(op); + doAnswer( + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + // flip the latch to unleash the spamming threads to congest + // the queue. + startSpamLatch.countDown(); + // wait until unblocked after a synchronized thread is started. + blockerSemaphore.acquire(); + invocation.callRealMethod(); + return null; + } + } + ).when(blockingOp).setTransactionId(Mockito.anyLong()); + // don't reset fields so instance can be reused. + Mockito.doNothing().when(blockingOp).reset(); + + // repeatedly overflow the queue and verify it doesn't deadlock. + for (int i = 0; i < 8; i++) { + // when the blockingOp is logged, it triggers the latch to unleash the + // spammers to overflow the edit queue, then waits for a permit + // from blockerSemaphore that will be released at the bottom of + // this loop. + Future blockingEdit = executor.submit(new Callable() { + @Override + public Void call() throws Exception { + Thread.currentThread().setName("Log blocker"); + editLog.logEdit(blockingOp); + editLog.logSync(); + return null; + } + }); + + // wait for spammers to seize up the edit log. + long startTxId = editLog.getLastWrittenTxIdWithoutLock(); + final long[] txIds = { startTxId, startTxId, startTxId }; + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + txIds[0] = txIds[1]; + txIds[1] = txIds[2]; + txIds[2] = editLog.getLastWrittenTxIdWithoutLock(); + return (txIds[0] == txIds[1] && + txIds[1] == txIds[2] && + txIds[2] > startTxId); + } + }, 100, 10000); + + // callers that synchronize on the edit log while the queue is full + // are prone to deadlock if the locking is incorrect. at this point: + // 1. the blocking edit is holding the log's monitor. + // 2. the spammers have filled the queue. + // 3. the spammers are blocked waiting to queue another edit. + // Now we'll start another thread to synchronize on the log (simulates + // what log rolling does), unblock the op currently holding the + // monitor, and ensure deadlock does not occur. + CountDownLatch readyLatch = new CountDownLatch(1); + Future synchedEdits = executor.submit(new Callable() { + @Override + public Void call() throws Exception { + Thread.currentThread().setName("Log synchronizer"); + // the sync is CRUCIAL for this test. it's what causes edit + // log rolling to deadlock when queue is full. + readyLatch.countDown(); + synchronized (editLog) { + editLog.logEdit(reuseOp); + editLog.logSync(); + } + return null; + } + }); + // unblock the edit jammed in setting its txid. queued edits should + // start flowing and the synced edits should complete. + readyLatch.await(); + blockerSemaphore.release(); + blockingEdit.get(); + synchedEdits.get(); + } + + // tell spammers to stop. + done.set(true); + for (int i=0; i < logSpammers.length; i++) { + logSpammers[i].get(); + } + // just make sure everything can be synced. + editLog.logSyncAll(); + } finally { + LOG.info("Closing nn"); + executor.shutdownNow(); + if (namesystem != null) { + namesystem.getFSImage().getStorage().close(); + namesystem.close(); + } + } + } }