HDFS-15915. Race condition with async edits logging due to updating txId outside of the namesystem log. Contributed by Konstantin V Shvachko.

(cherry picked from commit 1abd03d68f)

# Conflicts:
#	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
#	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
#	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
This commit is contained in:
Konstantin V Shvachko 2021-05-26 12:07:13 -07:00
parent 76eea30bdd
commit 8715a18f95
8 changed files with 276 additions and 38 deletions

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
/** /**
* A generic abstract class to support journaling of edits logs into * A generic abstract class to support journaling of edits logs into
@ -40,6 +41,16 @@ public abstract class EditLogOutputStream implements Closeable {
numSync = totalTimeSync = 0; numSync = totalTimeSync = 0;
} }
/**
* Get the last txId journalled in the stream.
* The txId is recorded when FSEditLogOp is written to the stream.
* The default implementation is dummy.
* JournalSet tracks the txId uniformly for all underlying streams.
*/
public long getLastJournalledTxId() {
return HdfsServerConstants.INVALID_TXID;
};
/** /**
* Write edits log operation to the stream. * Write edits log operation to the stream.
* *

View File

@ -213,7 +213,10 @@ public class FSEditLog implements LogsPurgeable {
private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() { private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
@Override @Override
protected synchronized TransactionId initialValue() { protected synchronized TransactionId initialValue() {
return new TransactionId(Long.MAX_VALUE); // If an RPC call did not generate any transactions,
// logSync() should exit without syncing
// Therefore the initial value of myTransactionId should be 0
return new TransactionId(0L);
} }
}; };
@ -456,6 +459,7 @@ public class FSEditLog implements LogsPurgeable {
// wait if an automatic sync is scheduled // wait if an automatic sync is scheduled
waitIfAutoSyncScheduled(); waitIfAutoSyncScheduled();
beginTransaction(op);
// check if it is time to schedule an automatic sync // check if it is time to schedule an automatic sync
needsSync = doEditTransaction(op); needsSync = doEditTransaction(op);
if (needsSync) { if (needsSync) {
@ -470,9 +474,13 @@ public class FSEditLog implements LogsPurgeable {
} }
synchronized boolean doEditTransaction(final FSEditLogOp op) { synchronized boolean doEditTransaction(final FSEditLogOp op) {
long start = beginTransaction(); if (LOG.isDebugEnabled()) {
op.setTransactionId(txid); LOG.debug("doEditTx() op=" + op + " txid=" + txid);
}
assert op.hasTransactionId() :
"Transaction id is not set for " + op + " EditLog.txId=" + txid;
long start = monotonicNow();
try { try {
editLogStream.write(op); editLogStream.write(op);
} catch (IOException ex) { } catch (IOException ex) {
@ -516,7 +524,7 @@ public class FSEditLog implements LogsPurgeable {
return editLogStream.shouldForceSync(); return editLogStream.shouldForceSync();
} }
private long beginTransaction() { protected void beginTransaction(final FSEditLogOp op) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
// get a new transactionId // get a new transactionId
txid++; txid++;
@ -526,7 +534,9 @@ public class FSEditLog implements LogsPurgeable {
// //
TransactionId id = myTransactionId.get(); TransactionId id = myTransactionId.get();
id.txid = txid; id.txid = txid;
return monotonicNow(); if(op != null) {
op.setTransactionId(txid);
}
} }
private void endTransaction(long start) { private void endTransaction(long start) {
@ -643,7 +653,7 @@ public class FSEditLog implements LogsPurgeable {
} }
protected void logSync(long mytxid) { protected void logSync(long mytxid) {
long syncStart = 0; long lastJournalledTxId = HdfsServerConstants.INVALID_TXID;
boolean sync = false; boolean sync = false;
long editsBatchedInSync = 0; long editsBatchedInSync = 0;
try { try {
@ -670,8 +680,18 @@ public class FSEditLog implements LogsPurgeable {
// now, this thread will do the sync. track if other edits were // now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit // included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0 // synced then the batched count is 0
editsBatchedInSync = txid - synctxid - 1; lastJournalledTxId = editLogStream.getLastJournalledTxId();
syncStart = txid; if (LOG.isDebugEnabled()) {
LOG.debug("logSync(tx) synctxid=" + " lastJournalledTxId=" +
lastJournalledTxId + " mytxid=" + mytxid);
}
assert lastJournalledTxId <= txid : "lastJournalledTxId exceeds txid";
// The stream has already been flushed, or there are no active streams
// We still try to flush up to mytxid
if(lastJournalledTxId <= synctxid) {
lastJournalledTxId = mytxid;
}
editsBatchedInSync = lastJournalledTxId - synctxid - 1;
isSyncRunning = true; isSyncRunning = true;
sync = true; sync = true;
@ -731,7 +751,7 @@ public class FSEditLog implements LogsPurgeable {
// Prevent RuntimeException from blocking other log edit sync // Prevent RuntimeException from blocking other log edit sync
synchronized (this) { synchronized (this) {
if (sync) { if (sync) {
synctxid = syncStart; synctxid = lastJournalledTxId;
for (JournalManager jm : journalSet.getJournalManagers()) { for (JournalManager jm : journalSet.getJournalManagers()) {
/** /**
* {@link FileJournalManager#lastReadableTxId} is only meaningful * {@link FileJournalManager#lastReadableTxId} is only meaningful
@ -739,7 +759,7 @@ public class FSEditLog implements LogsPurgeable {
* other types of {@link JournalManager}. * other types of {@link JournalManager}.
*/ */
if (jm instanceof FileJournalManager) { if (jm instanceof FileJournalManager) {
((FileJournalManager)jm).setLastReadableTxId(syncStart); ((FileJournalManager)jm).setLastReadableTxId(synctxid);
} }
} }
isSyncRunning = false; isSyncRunning = false;
@ -1501,7 +1521,8 @@ public class FSEditLog implements LogsPurgeable {
* store yet. * store yet.
*/ */
synchronized void logEdit(final int length, final byte[] data) { synchronized void logEdit(final int length, final byte[] data) {
long start = beginTransaction(); beginTransaction(null);
long start = monotonicNow();
try { try {
editLogStream.writeRaw(data, 0, length); editLogStream.writeRaw(data, 0, length);

View File

@ -115,9 +115,14 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
@Override @Override
void logEdit(final FSEditLogOp op) { void logEdit(final FSEditLogOp op) {
assert isOpenForWrite();
Edit edit = getEditInstance(op); Edit edit = getEditInstance(op);
threadEdit.set(edit); threadEdit.set(edit);
enqueueEdit(edit); synchronized(this) {
enqueueEdit(edit);
beginTransaction(op);
}
} }
@Override @Override

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.INVALID_TXID;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException; import java.io.IOException;
@ -194,9 +195,11 @@ public class JournalSet implements JournalManager {
final int minimumRedundantJournals; final int minimumRedundantJournals;
private boolean closed; private boolean closed;
private long lastJournalledTxId;
JournalSet(int minimumRedundantResources) { JournalSet(int minimumRedundantResources) {
this.minimumRedundantJournals = minimumRedundantResources; this.minimumRedundantJournals = minimumRedundantResources;
lastJournalledTxId = INVALID_TXID;
} }
@Override @Override
@ -446,6 +449,16 @@ public class JournalSet implements JournalManager {
super(); super();
} }
/**
* Get the last txId journalled in the stream.
* The txId is recorded when FSEditLogOp is written to the journal.
* JournalSet tracks the txId uniformly for all underlying streams.
*/
@Override
public long getLastJournalledTxId() {
return lastJournalledTxId;
}
@Override @Override
public void write(final FSEditLogOp op) public void write(final FSEditLogOp op)
throws IOException { throws IOException {
@ -457,6 +470,10 @@ public class JournalSet implements JournalManager {
} }
} }
}, "write op"); }, "write op");
assert lastJournalledTxId < op.txid : "TxId order violation for op=" +
op + ", lastJournalledTxId=" + lastJournalledTxId;
lastJournalledTxId = op.txid;
} }
@Override @Override

View File

@ -21,6 +21,8 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import java.io.File; import java.io.File;
@ -53,8 +55,13 @@ import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY; import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY;
/** /**
@ -306,6 +313,33 @@ public class NameNodeAdapter {
return spyEditLog; return spyEditLog;
} }
/**
* Spy on EditLog to delay execution of doEditTransaction() for MkdirOp.
*/
public static FSEditLog spyDelayMkDirTransaction(
final NameNode nn, final long delay) {
FSEditLog realEditLog = nn.getFSImage().getEditLog();
FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog);
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
Answer<Boolean> ans = new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(delay);
return (Boolean) invocation.callRealMethod();
}
};
ArgumentMatcher<FSEditLogOp> am = new ArgumentMatcher<FSEditLogOp>() {
@Override
public boolean matches(Object argument) {
FSEditLogOp op = (FSEditLogOp) argument;
return op.opCode == FSEditLogOpCodes.OP_MKDIR;
}
};
doAnswer(ans).when(spyEditLog).doEditTransaction(
Matchers.argThat(am));
return spyEditLog;
}
public static JournalSet spyOnJournalSet(NameNode nn) { public static JournalSet spyOnJournalSet(NameNode nn) {
FSEditLog editLog = nn.getFSImage().getEditLog(); FSEditLog editLog = nn.getFSImage().getEditLog();
JournalSet js = Mockito.spy(editLog.getJournalSet()); JournalSet js = Mockito.spy(editLog.getJournalSet());

View File

@ -17,9 +17,11 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -53,6 +55,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -61,6 +65,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.ArgumentMatcher;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -285,12 +290,12 @@ public class TestEditLogRace {
File editFile = new File(sd.getCurrentDir(), logFileName); File editFile = new File(sd.getCurrentDir(), logFileName);
System.out.println("Verifying file: " + editFile); LOG.info("Verifying file: " + editFile);
FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId); FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
long numEditsThisLog = loader.loadFSEdits( long numEditsThisLog = loader.loadFSEdits(
new EditLogFileInputStream(editFile), startTxId); new EditLogFileInputStream(editFile), startTxId);
System.out.println("Number of edits: " + numEditsThisLog); LOG.info("Number of edits: " + numEditsThisLog);
assertTrue(numEdits == -1 || numEditsThisLog == numEdits); assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
numEdits = numEditsThisLog; numEdits = numEditsThisLog;
} }
@ -575,9 +580,29 @@ public class TestEditLogRace {
} }
} }
static SetOwnerOp getSetOwnerOp(OpInstanceCache cache, String group) {
return ((SetOwnerOp)cache.get(OP_SET_OWNER))
.setSource("/").setUser("u").setGroup(group);
}
static class BlockingOpMatcher extends ArgumentMatcher<FSEditLogOp> {
@Override
public boolean matches(Object o) {
if(o instanceof FSEditLogOp.SetOwnerOp) {
FSEditLogOp.SetOwnerOp op = (FSEditLogOp.SetOwnerOp)o;
if("b".equals(op.groupname)) {
LOG.info("Blocking op: " + op);
return true;
}
}
return false;
}
}
@Test(timeout=180000) @Test(timeout=180000)
public void testDeadlock() throws Throwable { public void testDeadlock() throws Throwable {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO); GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(FSEditLogAsync.LOG, Level.DEBUG);
Configuration conf = getConf(); Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE); NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
@ -590,21 +615,17 @@ public class TestEditLogRace {
ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor = Executors.newCachedThreadPool();
try { try {
final FSEditLog editLog = namesystem.getEditLog(); final FSEditLog editLog = spy(namesystem.getEditLog());
DFSTestUtil.setEditLogForTesting(namesystem, editLog);
FSEditLogOp.OpInstanceCache cache = editLog.cache.get(); final OpInstanceCache cache = editLog.cache.get();
final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
.setSource("/").setUser("u").setGroup("g");
// don't reset fields so instance can be reused.
final FSEditLogOp reuseOp = Mockito.spy(op);
Mockito.doNothing().when(reuseOp).reset();
// only job is spam edits. it will fill the queue when the test // only job is spam edits. it will fill the queue when the test
// loop injects the blockingOp. // loop injects the blockingOp.
Future[] logSpammers = new Future[16]; Future<?>[] logSpammers = new Future<?>[16];
for (int i=0; i < logSpammers.length; i++) { for (int i=0; i < logSpammers.length; i++) {
final int ii = i; final int ii = i;
logSpammers[i] = executor.submit(new Callable() { logSpammers[i] = executor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
Thread.currentThread().setName("Log spammer " + ii); Thread.currentThread().setName("Log spammer " + ii);
@ -612,7 +633,7 @@ public class TestEditLogRace {
startSpamLatch.await(); startSpamLatch.await();
for (int i = 0; !done.get() && i < 1000000; i++) { for (int i = 0; !done.get() && i < 1000000; i++) {
// do not logSync here because we need to congest the queue. // do not logSync here because we need to congest the queue.
editLog.logEdit(reuseOp); editLog.logEdit(getSetOwnerOp(cache, "g"));
if (i % 2048 == 0) { if (i % 2048 == 0) {
LOG.info("thread[" + ii +"] edits=" + i); LOG.info("thread[" + ii +"] edits=" + i);
} }
@ -623,10 +644,9 @@ public class TestEditLogRace {
}); });
} }
// the tx id is set while the edit log monitor is held, so this will // doEditTransaction is set while the edit log monitor is held, so this
// effectively stall the async processing thread which will cause the // will effectively stall the async processing thread which will cause
// edit queue to fill up. // the edit queue to fill up.
final FSEditLogOp blockingOp = Mockito.spy(op);
doAnswer( doAnswer(
new Answer<Void>() { new Answer<Void>() {
@Override @Override
@ -640,9 +660,7 @@ public class TestEditLogRace {
return null; return null;
} }
} }
).when(blockingOp).setTransactionId(Mockito.anyLong()); ).when(editLog).doEditTransaction(argThat(new BlockingOpMatcher()));
// don't reset fields so instance can be reused.
Mockito.doNothing().when(blockingOp).reset();
// repeatedly overflow the queue and verify it doesn't deadlock. // repeatedly overflow the queue and verify it doesn't deadlock.
for (int i = 0; i < 8; i++) { for (int i = 0; i < 8; i++) {
@ -650,10 +668,11 @@ public class TestEditLogRace {
// spammers to overflow the edit queue, then waits for a permit // spammers to overflow the edit queue, then waits for a permit
// from blockerSemaphore that will be released at the bottom of // from blockerSemaphore that will be released at the bottom of
// this loop. // this loop.
Future blockingEdit = executor.submit(new Callable() { Future<Void> blockingEdit = executor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
Thread.currentThread().setName("Log blocker"); Thread.currentThread().setName("Log blocker");
final FSEditLogOp blockingOp = getSetOwnerOp(cache, "b");
editLog.logEdit(blockingOp); editLog.logEdit(blockingOp);
editLog.logSync(); editLog.logSync();
return null; return null;
@ -684,7 +703,7 @@ public class TestEditLogRace {
// what log rolling does), unblock the op currently holding the // what log rolling does), unblock the op currently holding the
// monitor, and ensure deadlock does not occur. // monitor, and ensure deadlock does not occur.
final CountDownLatch readyLatch = new CountDownLatch(1); final CountDownLatch readyLatch = new CountDownLatch(1);
Future synchedEdits = executor.submit(new Callable() { Future<Void> synchedEdits = executor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
Thread.currentThread().setName("Log synchronizer"); Thread.currentThread().setName("Log synchronizer");
@ -692,7 +711,7 @@ public class TestEditLogRace {
// log rolling to deadlock when queue is full. // log rolling to deadlock when queue is full.
readyLatch.countDown(); readyLatch.countDown();
synchronized (editLog) { synchronized (editLog) {
editLog.logEdit(reuseOp); editLog.logEdit(getSetOwnerOp(cache, "g"));
editLog.logSync(); editLog.logSync();
} }
return null; return null;

View File

@ -380,4 +380,16 @@ public abstract class HATestUtil {
lastSeenStateId.set(stateId); lastSeenStateId.set(stateId);
return currentStateId; return currentStateId;
} }
/**
* Get last seen stateId from the client AlignmentContext.
*/
public static long getLastSeenStateId(DistributedFileSystem dfs)
throws Exception {
ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode())).getProxyProvider();
ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
return ac.getLastSeenStateId();
}
} }

View File

@ -29,13 +29,18 @@ import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -54,11 +59,13 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -484,6 +491,118 @@ public class TestObserverNode {
} }
} }
/**
* The test models the race of two mkdirs RPC calls on the same path to
* Active NameNode. The first arrived call will journal a mkdirs transaction.
* The subsequent call hitting the NameNode before the mkdirs transaction is
* synced will see that the directory already exists, but will obtain
* lastSeenStateId smaller than the txId of the mkdirs transaction
* since the latter hasn't been synced yet.
* This causes stale read from Observer for the second client.
* See HDFS-15915.
*/
@Test
public void testMkdirsRaceWithObserverRead() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
// Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec
FSEditLog spyEditLog = NameNodeAdapter.spyDelayMkDirTransaction(
dfsCluster.getNameNode(0), 100);
final int numThreads = 4;
ClientState[] clientStates = new ClientState[numThreads];
final ExecutorService threadPool =
HadoopExecutors.newFixedThreadPool(numThreads);
final Future<?>[] futures = new Future<?>[numThreads];
Configuration conf2 = new Configuration(conf);
// Disable FS cache so that different DFS clients are used
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
for (int i = 0; i < numThreads; i++) {
clientStates[i] = new ClientState();
futures[i] = threadPool.submit(new MkDirRunner(conf2, clientStates[i]));
}
Thread.sleep(150); // wait until mkdir is logged
long activStateId =
dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId();
dfsCluster.rollEditLogAndTail(0);
boolean finished = true;
// wait for all dispatcher threads to finish
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
finished = false;
LOG.warn("MkDirRunner thread failed", e.getCause());
}
}
assertTrue("Not all threads finished", finished);
threadPool.shutdown();
assertEquals("Active and Observer stateIds don't match",
dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(),
dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId());
for (int i = 0; i < numThreads; i++) {
assertTrue("Client #" + i
+ " lastSeenStateId=" + clientStates[i].lastSeenStateId
+ " activStateId=" + activStateId
+ "\n" + clientStates[i].fnfe,
clientStates[i].lastSeenStateId >= activStateId &&
clientStates[i].fnfe == null);
}
// Restore edit log
Mockito.reset(spyEditLog);
}
static class ClientState {
private long lastSeenStateId = -7;
private FileNotFoundException fnfe;
}
static class MkDirRunner implements Runnable {
private static final Path DIR_PATH =
new Path("/TestObserverNode/testMkdirsRaceWithObserverRead");
private DistributedFileSystem fs;
private ClientState clientState;
MkDirRunner(Configuration conf, ClientState cs) throws IOException {
super();
fs = (DistributedFileSystem) FileSystem.get(conf);
clientState = cs;
}
@Override
public void run() {
try {
fs.mkdirs(DIR_PATH);
clientState.lastSeenStateId = HATestUtil.getLastSeenStateId(fs);
assertSentTo(fs, 0);
FileStatus stat = fs.getFileStatus(DIR_PATH);
assertSentTo(fs, 2);
assertTrue("Should be a directory", stat.isDirectory());
} catch (FileNotFoundException ioe) {
clientState.fnfe = ioe;
} catch (Exception e) {
fail("Unexpected exception: " + e);
}
}
}
private static void assertSentTo(DistributedFileSystem fs, int nnIdx)
throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx));
}
private void assertSentTo(int nnIdx) throws IOException { private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx, assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx)); HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));