From 8715a18f95129b482549f539322fe2241cd27ac3 Mon Sep 17 00:00:00 2001 From: Konstantin V Shvachko Date: Wed, 26 May 2021 12:07:13 -0700 Subject: [PATCH] HDFS-15915. Race condition with async edits logging due to updating txId outside of the namesystem log. Contributed by Konstantin V Shvachko. (cherry picked from commit 1abd03d68f4f236674ce929164cc460037730abb) # Conflicts: # hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java # hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java # hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java --- .../server/namenode/EditLogOutputStream.java | 11 ++ .../hdfs/server/namenode/FSEditLog.java | 43 +++++-- .../hdfs/server/namenode/FSEditLogAsync.java | 7 +- .../hdfs/server/namenode/JournalSet.java | 19 ++- .../hdfs/server/namenode/NameNodeAdapter.java | 36 +++++- .../hdfs/server/namenode/TestEditLogRace.java | 65 ++++++---- .../hdfs/server/namenode/ha/HATestUtil.java | 12 ++ .../server/namenode/ha/TestObserverNode.java | 121 +++++++++++++++++- 8 files changed, 276 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java index b4ca2d6c0df..dd931a9cc05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; /** * A generic abstract class to support journaling of edits logs into @@ -40,6 +41,16 @@ public EditLogOutputStream() throws IOException { numSync = totalTimeSync = 0; } + /** + * Get the last txId journalled in the stream. + * The txId is recorded when FSEditLogOp is written to the stream. + * The default implementation is dummy. + * JournalSet tracks the txId uniformly for all underlying streams. + */ + public long getLastJournalledTxId() { + return HdfsServerConstants.INVALID_TXID; + }; + /** * Write edits log operation to the stream. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 0986129b704..783ab22c367 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -213,7 +213,10 @@ private static class TransactionId { private static final ThreadLocal myTransactionId = new ThreadLocal() { @Override protected synchronized TransactionId initialValue() { - return new TransactionId(Long.MAX_VALUE); + // If an RPC call did not generate any transactions, + // logSync() should exit without syncing + // Therefore the initial value of myTransactionId should be 0 + return new TransactionId(0L); } }; @@ -456,6 +459,7 @@ assert isOpenForWrite() : // wait if an automatic sync is scheduled waitIfAutoSyncScheduled(); + beginTransaction(op); // check if it is time to schedule an automatic sync needsSync = doEditTransaction(op); if (needsSync) { @@ -470,9 +474,13 @@ assert isOpenForWrite() : } synchronized boolean doEditTransaction(final FSEditLogOp op) { - long start = beginTransaction(); - op.setTransactionId(txid); + if (LOG.isDebugEnabled()) { + LOG.debug("doEditTx() op=" + op + " txid=" + txid); + } + assert op.hasTransactionId() : + "Transaction id is not set for " + op + " EditLog.txId=" + txid; + long start = monotonicNow(); try { editLogStream.write(op); } catch (IOException ex) { @@ -516,7 +524,7 @@ private boolean shouldForceSync() { return editLogStream.shouldForceSync(); } - private long beginTransaction() { + protected void beginTransaction(final FSEditLogOp op) { assert Thread.holdsLock(this); // get a new transactionId txid++; @@ -526,7 +534,9 @@ private long beginTransaction() { // TransactionId id = myTransactionId.get(); id.txid = txid; - return monotonicNow(); + if(op != null) { + op.setTransactionId(txid); + } } private void endTransaction(long start) { @@ -643,7 +653,7 @@ public void logSync() { } protected void logSync(long mytxid) { - long syncStart = 0; + long lastJournalledTxId = HdfsServerConstants.INVALID_TXID; boolean sync = false; long editsBatchedInSync = 0; try { @@ -670,8 +680,18 @@ protected void logSync(long mytxid) { // now, this thread will do the sync. track if other edits were // included in the sync - ie. batched. if this is the only edit // synced then the batched count is 0 - editsBatchedInSync = txid - synctxid - 1; - syncStart = txid; + lastJournalledTxId = editLogStream.getLastJournalledTxId(); + if (LOG.isDebugEnabled()) { + LOG.debug("logSync(tx) synctxid=" + " lastJournalledTxId=" + + lastJournalledTxId + " mytxid=" + mytxid); + } + assert lastJournalledTxId <= txid : "lastJournalledTxId exceeds txid"; + // The stream has already been flushed, or there are no active streams + // We still try to flush up to mytxid + if(lastJournalledTxId <= synctxid) { + lastJournalledTxId = mytxid; + } + editsBatchedInSync = lastJournalledTxId - synctxid - 1; isSyncRunning = true; sync = true; @@ -731,7 +751,7 @@ protected void logSync(long mytxid) { // Prevent RuntimeException from blocking other log edit sync synchronized (this) { if (sync) { - synctxid = syncStart; + synctxid = lastJournalledTxId; for (JournalManager jm : journalSet.getJournalManagers()) { /** * {@link FileJournalManager#lastReadableTxId} is only meaningful @@ -739,7 +759,7 @@ protected void logSync(long mytxid) { * other types of {@link JournalManager}. */ if (jm instanceof FileJournalManager) { - ((FileJournalManager)jm).setLastReadableTxId(syncStart); + ((FileJournalManager)jm).setLastReadableTxId(synctxid); } } isSyncRunning = false; @@ -1501,7 +1521,8 @@ private synchronized BackupJournalManager findBackupJournal( * store yet. */ synchronized void logEdit(final int length, final byte[] data) { - long start = beginTransaction(); + beginTransaction(null); + long start = monotonicNow(); try { editLogStream.writeRaw(data, 0, length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java index 3eb75b8fb85..d55cecfb73b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java @@ -115,9 +115,14 @@ public void close() { @Override void logEdit(final FSEditLogOp op) { + assert isOpenForWrite(); + Edit edit = getEditInstance(op); threadEdit.set(edit); - enqueueEdit(edit); + synchronized(this) { + enqueueEdit(edit); + beginTransaction(op); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index 758cbe0c7e1..ebd626f252b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.INVALID_TXID; import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; @@ -194,9 +195,11 @@ public boolean isShared() { final int minimumRedundantJournals; private boolean closed; - + private long lastJournalledTxId; + JournalSet(int minimumRedundantResources) { this.minimumRedundantJournals = minimumRedundantResources; + lastJournalledTxId = INVALID_TXID; } @Override @@ -446,6 +449,16 @@ private class JournalSetOutputStream extends EditLogOutputStream { super(); } + /** + * Get the last txId journalled in the stream. + * The txId is recorded when FSEditLogOp is written to the journal. + * JournalSet tracks the txId uniformly for all underlying streams. + */ + @Override + public long getLastJournalledTxId() { + return lastJournalledTxId; + } + @Override public void write(final FSEditLogOp op) throws IOException { @@ -457,6 +470,10 @@ public void apply(JournalAndStream jas) throws IOException { } } }, "write op"); + + assert lastJournalledTxId < op.txid : "TxId order violation for op=" + + op + ", lastJournalledTxId=" + lastJournalledTxId; + lastJournalledTxId = op.txid; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 4b205fd38e3..a26f22f898e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; + +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import java.io.File; @@ -53,8 +55,13 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; +import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY; /** @@ -305,7 +312,34 @@ public static FSEditLog spyOnEditLog(NameNode nn) { } return spyEditLog; } - + + /** + * Spy on EditLog to delay execution of doEditTransaction() for MkdirOp. + */ + public static FSEditLog spyDelayMkDirTransaction( + final NameNode nn, final long delay) { + FSEditLog realEditLog = nn.getFSImage().getEditLog(); + FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog); + DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog); + Answer ans = new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(delay); + return (Boolean) invocation.callRealMethod(); + } + }; + ArgumentMatcher am = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + FSEditLogOp op = (FSEditLogOp) argument; + return op.opCode == FSEditLogOpCodes.OP_MKDIR; + } + }; + doAnswer(ans).when(spyEditLog).doEditTransaction( + Matchers.argThat(am)); + return spyEditLog; + } + public static JournalSet spyOnJournalSet(NameNode nn) { FSEditLog editLog = nn.getFSImage().getEditLog(); JournalSet js = Mockito.spy(editLog.getJournalSet()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java index 962c1c731ab..d4c50a128b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -53,6 +55,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; @@ -61,6 +65,7 @@ import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.mockito.Mockito; +import org.mockito.ArgumentMatcher; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -285,12 +290,12 @@ private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage, File editFile = new File(sd.getCurrentDir(), logFileName); - System.out.println("Verifying file: " + editFile); + LOG.info("Verifying file: " + editFile); FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId); long numEditsThisLog = loader.loadFSEdits( new EditLogFileInputStream(editFile), startTxId); - System.out.println("Number of edits: " + numEditsThisLog); + LOG.info("Number of edits: " + numEditsThisLog); assertTrue(numEdits == -1 || numEditsThisLog == numEdits); numEdits = numEditsThisLog; } @@ -575,9 +580,29 @@ public void run() { } } + static SetOwnerOp getSetOwnerOp(OpInstanceCache cache, String group) { + return ((SetOwnerOp)cache.get(OP_SET_OWNER)) + .setSource("/").setUser("u").setGroup(group); + } + + static class BlockingOpMatcher extends ArgumentMatcher { + @Override + public boolean matches(Object o) { + if(o instanceof FSEditLogOp.SetOwnerOp) { + FSEditLogOp.SetOwnerOp op = (FSEditLogOp.SetOwnerOp)o; + if("b".equals(op.groupname)) { + LOG.info("Blocking op: " + op); + return true; + } + } + return false; + } + } + @Test(timeout=180000) public void testDeadlock() throws Throwable { - GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO); + GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(FSEditLogAsync.LOG, Level.DEBUG); Configuration conf = getConf(); NameNode.initMetrics(conf, NamenodeRole.NAMENODE); @@ -590,21 +615,17 @@ public void testDeadlock() throws Throwable { ExecutorService executor = Executors.newCachedThreadPool(); try { - final FSEditLog editLog = namesystem.getEditLog(); + final FSEditLog editLog = spy(namesystem.getEditLog()); + DFSTestUtil.setEditLogForTesting(namesystem, editLog); - FSEditLogOp.OpInstanceCache cache = editLog.cache.get(); - final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache) - .setSource("/").setUser("u").setGroup("g"); - // don't reset fields so instance can be reused. - final FSEditLogOp reuseOp = Mockito.spy(op); - Mockito.doNothing().when(reuseOp).reset(); + final OpInstanceCache cache = editLog.cache.get(); // only job is spam edits. it will fill the queue when the test // loop injects the blockingOp. - Future[] logSpammers = new Future[16]; + Future[] logSpammers = new Future[16]; for (int i=0; i < logSpammers.length; i++) { final int ii = i; - logSpammers[i] = executor.submit(new Callable() { + logSpammers[i] = executor.submit(new Callable() { @Override public Void call() throws Exception { Thread.currentThread().setName("Log spammer " + ii); @@ -612,7 +633,7 @@ public Void call() throws Exception { startSpamLatch.await(); for (int i = 0; !done.get() && i < 1000000; i++) { // do not logSync here because we need to congest the queue. - editLog.logEdit(reuseOp); + editLog.logEdit(getSetOwnerOp(cache, "g")); if (i % 2048 == 0) { LOG.info("thread[" + ii +"] edits=" + i); } @@ -623,10 +644,9 @@ public Void call() throws Exception { }); } - // the tx id is set while the edit log monitor is held, so this will - // effectively stall the async processing thread which will cause the - // edit queue to fill up. - final FSEditLogOp blockingOp = Mockito.spy(op); + // doEditTransaction is set while the edit log monitor is held, so this + // will effectively stall the async processing thread which will cause + // the edit queue to fill up. doAnswer( new Answer() { @Override @@ -640,9 +660,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } } - ).when(blockingOp).setTransactionId(Mockito.anyLong()); - // don't reset fields so instance can be reused. - Mockito.doNothing().when(blockingOp).reset(); + ).when(editLog).doEditTransaction(argThat(new BlockingOpMatcher())); // repeatedly overflow the queue and verify it doesn't deadlock. for (int i = 0; i < 8; i++) { @@ -650,10 +668,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable { // spammers to overflow the edit queue, then waits for a permit // from blockerSemaphore that will be released at the bottom of // this loop. - Future blockingEdit = executor.submit(new Callable() { + Future blockingEdit = executor.submit(new Callable() { @Override public Void call() throws Exception { Thread.currentThread().setName("Log blocker"); + final FSEditLogOp blockingOp = getSetOwnerOp(cache, "b"); editLog.logEdit(blockingOp); editLog.logSync(); return null; @@ -684,7 +703,7 @@ public Boolean get() { // what log rolling does), unblock the op currently holding the // monitor, and ensure deadlock does not occur. final CountDownLatch readyLatch = new CountDownLatch(1); - Future synchedEdits = executor.submit(new Callable() { + Future synchedEdits = executor.submit(new Callable() { @Override public Void call() throws Exception { Thread.currentThread().setName("Log synchronizer"); @@ -692,7 +711,7 @@ public Void call() throws Exception { // log rolling to deadlock when queue is full. readyLatch.countDown(); synchronized (editLog) { - editLog.logEdit(reuseOp); + editLog.logEdit(getSetOwnerOp(cache, "g")); editLog.logSync(); } return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 8c330c54b89..e6007af1cec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -380,4 +380,16 @@ public static long setACStateId(DistributedFileSystem dfs, lastSeenStateId.set(stateId); return currentStateId; } + + /** + * Get last seen stateId from the client AlignmentContext. + */ + public static long getLastSeenStateId(DistributedFileSystem dfs) + throws Exception { + ObserverReadProxyProvider provider = (ObserverReadProxyProvider) + ((RetryInvocationHandler) Proxy.getInvocationHandler( + dfs.getClient().getNamenode())).getProxyProvider(); + ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext()); + return ac.getLastSeenStateId(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 36e2bc0f89e..a97f8a93084 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -29,13 +29,18 @@ import static org.mockito.Mockito.doAnswer; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; - +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileUtil; @@ -54,11 +59,13 @@ import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -484,6 +491,118 @@ public void testStickyActive() throws Exception { } } + /** + * The test models the race of two mkdirs RPC calls on the same path to + * Active NameNode. The first arrived call will journal a mkdirs transaction. + * The subsequent call hitting the NameNode before the mkdirs transaction is + * synced will see that the directory already exists, but will obtain + * lastSeenStateId smaller than the txId of the mkdirs transaction + * since the latter hasn't been synced yet. + * This causes stale read from Observer for the second client. + * See HDFS-15915. + */ + @Test + public void testMkdirsRaceWithObserverRead() throws Exception { + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + dfsCluster.rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2); + + // Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec + FSEditLog spyEditLog = NameNodeAdapter.spyDelayMkDirTransaction( + dfsCluster.getNameNode(0), 100); + + final int numThreads = 4; + ClientState[] clientStates = new ClientState[numThreads]; + final ExecutorService threadPool = + HadoopExecutors.newFixedThreadPool(numThreads); + final Future[] futures = new Future[numThreads]; + + Configuration conf2 = new Configuration(conf); + // Disable FS cache so that different DFS clients are used + conf2.setBoolean("fs.hdfs.impl.disable.cache", true); + + for (int i = 0; i < numThreads; i++) { + clientStates[i] = new ClientState(); + futures[i] = threadPool.submit(new MkDirRunner(conf2, clientStates[i])); + } + + Thread.sleep(150); // wait until mkdir is logged + long activStateId = + dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(); + dfsCluster.rollEditLogAndTail(0); + boolean finished = true; + // wait for all dispatcher threads to finish + for (Future future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + finished = false; + LOG.warn("MkDirRunner thread failed", e.getCause()); + } + } + assertTrue("Not all threads finished", finished); + threadPool.shutdown(); + + assertEquals("Active and Observer stateIds don't match", + dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(), + dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId()); + for (int i = 0; i < numThreads; i++) { + assertTrue("Client #" + i + + " lastSeenStateId=" + clientStates[i].lastSeenStateId + + " activStateId=" + activStateId + + "\n" + clientStates[i].fnfe, + clientStates[i].lastSeenStateId >= activStateId && + clientStates[i].fnfe == null); + } + + // Restore edit log + Mockito.reset(spyEditLog); + } + + static class ClientState { + private long lastSeenStateId = -7; + private FileNotFoundException fnfe; + } + + static class MkDirRunner implements Runnable { + private static final Path DIR_PATH = + new Path("/TestObserverNode/testMkdirsRaceWithObserverRead"); + + private DistributedFileSystem fs; + private ClientState clientState; + + MkDirRunner(Configuration conf, ClientState cs) throws IOException { + super(); + fs = (DistributedFileSystem) FileSystem.get(conf); + clientState = cs; + } + + @Override + public void run() { + try { + fs.mkdirs(DIR_PATH); + clientState.lastSeenStateId = HATestUtil.getLastSeenStateId(fs); + assertSentTo(fs, 0); + + FileStatus stat = fs.getFileStatus(DIR_PATH); + assertSentTo(fs, 2); + assertTrue("Should be a directory", stat.isDirectory()); + } catch (FileNotFoundException ioe) { + clientState.fnfe = ioe; + } catch (Exception e) { + fail("Unexpected exception: " + e); + } + } + } + + private static void assertSentTo(DistributedFileSystem fs, int nnIdx) + throws IOException { + assertTrue("Request was not sent to the expected namenode " + nnIdx, + HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx)); + } + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));