HDFS-15915. Race condition with async edits logging due to updating txId outside of the namesystem log. Contributed by Konstantin V Shvachko.

This commit is contained in:
Konstantin V Shvachko 2021-05-26 12:07:13 -07:00
parent bcaeb1ac8c
commit 1abd03d68f
8 changed files with 270 additions and 38 deletions

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
/**
* A generic abstract class to support journaling of edits logs into
@ -42,6 +43,16 @@ public EditLogOutputStream() throws IOException {
numSync = totalTimeSync = 0;
}
/**
* Get the last txId journalled in the stream.
* The txId is recorded when FSEditLogOp is written to the stream.
* The default implementation is dummy.
* JournalSet tracks the txId uniformly for all underlying streams.
*/
public long getLastJournalledTxId() {
return HdfsServerConstants.INVALID_TXID;
};
/**
* Write edits log operation to the stream.
*

View File

@ -218,7 +218,10 @@ private static class TransactionId {
private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
@Override
protected synchronized TransactionId initialValue() {
return new TransactionId(Long.MAX_VALUE);
// If an RPC call did not generate any transactions,
// logSync() should exit without syncing
// Therefore the initial value of myTransactionId should be 0
return new TransactionId(0L);
}
};
@ -463,6 +466,7 @@ assert isOpenForWrite() :
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
beginTransaction(op);
// check if it is time to schedule an automatic sync
needsSync = doEditTransaction(op);
if (needsSync) {
@ -477,9 +481,11 @@ assert isOpenForWrite() :
}
synchronized boolean doEditTransaction(final FSEditLogOp op) {
long start = beginTransaction();
op.setTransactionId(txid);
LOG.debug("doEditTx() op={} txid={}", op, txid);
assert op.hasTransactionId() :
"Transaction id is not set for " + op + " EditLog.txId=" + txid;
long start = monotonicNow();
try {
editLogStream.write(op);
} catch (IOException ex) {
@ -523,7 +529,7 @@ private boolean shouldForceSync() {
return editLogStream.shouldForceSync();
}
private long beginTransaction() {
protected void beginTransaction(final FSEditLogOp op) {
assert Thread.holdsLock(this);
// get a new transactionId
txid++;
@ -533,7 +539,9 @@ private long beginTransaction() {
//
TransactionId id = myTransactionId.get();
id.txid = txid;
return monotonicNow();
if(op != null) {
op.setTransactionId(txid);
}
}
private void endTransaction(long start) {
@ -650,7 +658,7 @@ public void logSync() {
}
protected void logSync(long mytxid) {
long syncStart = 0;
long lastJournalledTxId = HdfsServerConstants.INVALID_TXID;
boolean sync = false;
long editsBatchedInSync = 0;
try {
@ -677,8 +685,16 @@ protected void logSync(long mytxid) {
// now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0
editsBatchedInSync = txid - synctxid - 1;
syncStart = txid;
lastJournalledTxId = editLogStream.getLastJournalledTxId();
LOG.debug("logSync(tx) synctxid={} lastJournalledTxId={} mytxid={}",
synctxid, lastJournalledTxId, mytxid);
assert lastJournalledTxId <= txid : "lastJournalledTxId exceeds txid";
// The stream has already been flushed, or there are no active streams
// We still try to flush up to mytxid
if(lastJournalledTxId <= synctxid) {
lastJournalledTxId = mytxid;
}
editsBatchedInSync = lastJournalledTxId - synctxid - 1;
isSyncRunning = true;
sync = true;
@ -738,7 +754,7 @@ protected void logSync(long mytxid) {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
synctxid = syncStart;
synctxid = lastJournalledTxId;
for (JournalManager jm : journalSet.getJournalManagers()) {
/**
* {@link FileJournalManager#lastReadableTxId} is only meaningful
@ -746,7 +762,7 @@ protected void logSync(long mytxid) {
* other types of {@link JournalManager}.
*/
if (jm instanceof FileJournalManager) {
((FileJournalManager)jm).setLastReadableTxId(syncStart);
((FileJournalManager)jm).setLastReadableTxId(synctxid);
}
}
isSyncRunning = false;
@ -1618,7 +1634,8 @@ public synchronized void journal(long firstTxId, int numTxns, byte[] data) {
* store yet.
*/
synchronized void logEdit(final int length, final byte[] data) {
long start = beginTransaction();
beginTransaction(null);
long start = monotonicNow();
try {
editLogStream.writeRaw(data, 0, length);

View File

@ -125,9 +125,14 @@ public void close() {
@Override
void logEdit(final FSEditLogOp op) {
assert isOpenForWrite();
Edit edit = getEditInstance(op);
THREAD_EDIT.set(edit);
enqueueEdit(edit);
synchronized(this) {
enqueueEdit(edit);
beginTransaction(op);
}
}
@Override

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.INVALID_TXID;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
@ -187,9 +188,11 @@ public boolean isShared() {
final int minimumRedundantJournals;
private boolean closed;
private long lastJournalledTxId;
JournalSet(int minimumRedundantResources) {
this.minimumRedundantJournals = minimumRedundantResources;
lastJournalledTxId = INVALID_TXID;
}
@Override
@ -439,6 +442,16 @@ private class JournalSetOutputStream extends EditLogOutputStream {
super();
}
/**
* Get the last txId journalled in the stream.
* The txId is recorded when FSEditLogOp is written to the journal.
* JournalSet tracks the txId uniformly for all underlying streams.
*/
@Override
public long getLastJournalledTxId() {
return lastJournalledTxId;
}
@Override
public void write(final FSEditLogOp op)
throws IOException {
@ -450,6 +463,10 @@ public void apply(JournalAndStream jas) throws IOException {
}
}
}, "write op");
assert lastJournalledTxId < op.txid : "TxId order violation for op=" +
op + ", lastJournalledTxId=" + lastJournalledTxId;
lastJournalledTxId = op.txid;
}
@Override

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.File;
@ -55,7 +57,12 @@
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.Whitebox;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY;
/**
@ -318,7 +325,34 @@ public static FSEditLog spyOnEditLog(NameNode nn) {
}
return spyEditLog;
}
/**
* Spy on EditLog to delay execution of doEditTransaction() for MkdirOp.
*/
public static FSEditLog spyDelayMkDirTransaction(
final NameNode nn, final long delay) {
FSEditLog realEditLog = nn.getFSImage().getEditLog();
FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog);
DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
Answer<Boolean> ans = new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(delay);
return (Boolean) invocation.callRealMethod();
}
};
ArgumentMatcher<FSEditLogOp> am = new ArgumentMatcher<FSEditLogOp>() {
@Override
public boolean matches(FSEditLogOp argument) {
FSEditLogOp op = (FSEditLogOp) argument;
return op.opCode == FSEditLogOpCodes.OP_MKDIR;
}
};
doAnswer(ans).when(spyEditLog).doEditTransaction(
ArgumentMatchers.argThat(am));
return spyEditLog;
}
public static JournalSet spyOnJournalSet(NameNode nn) {
FSEditLog editLog = nn.getFSImage().getEditLog();
JournalSet js = Mockito.spy(editLog.getJournalSet());

View File

@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
@ -53,13 +55,15 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.mockito.Mockito;
import org.mockito.ArgumentMatcher;
import org.slf4j.event.Level;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -286,12 +290,12 @@ private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage,
File editFile = new File(sd.getCurrentDir(), logFileName);
System.out.println("Verifying file: " + editFile);
LOG.info("Verifying file: " + editFile);
FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
long numEditsThisLog = loader.loadFSEdits(
new EditLogFileInputStream(editFile), startTxId);
System.out.println("Number of edits: " + numEditsThisLog);
LOG.info("Number of edits: " + numEditsThisLog);
assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
numEdits = numEditsThisLog;
}
@ -576,9 +580,29 @@ public void run() {
}
}
static SetOwnerOp getSetOwnerOp(OpInstanceCache cache, String group) {
return ((SetOwnerOp)cache.get(OP_SET_OWNER))
.setSource("/").setUser("u").setGroup(group);
}
static class BlockingOpMatcher implements ArgumentMatcher<FSEditLogOp> {
@Override
public boolean matches(FSEditLogOp o) {
if(o instanceof FSEditLogOp.SetOwnerOp) {
FSEditLogOp.SetOwnerOp op = (FSEditLogOp.SetOwnerOp)o;
if("b".equals(op.groupname)) {
LOG.info("Blocking op: " + op);
return true;
}
}
return false;
}
}
@Test(timeout=180000)
public void testDeadlock() throws Throwable {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(FSEditLogAsync.LOG, Level.DEBUG);
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
@ -591,21 +615,17 @@ public void testDeadlock() throws Throwable {
ExecutorService executor = Executors.newCachedThreadPool();
try {
final FSEditLog editLog = namesystem.getEditLog();
final FSEditLog editLog = spy(namesystem.getEditLog());
DFSTestUtil.setEditLogForTesting(namesystem, editLog);
FSEditLogOp.OpInstanceCache cache = editLog.cache.get();
final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
.setSource("/").setUser("u").setGroup("g");
// don't reset fields so instance can be reused.
final FSEditLogOp reuseOp = Mockito.spy(op);
Mockito.doNothing().when(reuseOp).reset();
final OpInstanceCache cache = editLog.cache.get();
// only job is spam edits. it will fill the queue when the test
// loop injects the blockingOp.
Future[] logSpammers = new Future[16];
Future<?>[] logSpammers = new Future<?>[16];
for (int i=0; i < logSpammers.length; i++) {
final int ii = i;
logSpammers[i] = executor.submit(new Callable() {
logSpammers[i] = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log spammer " + ii);
@ -613,7 +633,7 @@ public Void call() throws Exception {
startSpamLatch.await();
for (int i = 0; !done.get() && i < 1000000; i++) {
// do not logSync here because we need to congest the queue.
editLog.logEdit(reuseOp);
editLog.logEdit(getSetOwnerOp(cache, "g"));
if (i % 2048 == 0) {
LOG.info("thread[" + ii +"] edits=" + i);
}
@ -624,10 +644,9 @@ public Void call() throws Exception {
});
}
// the tx id is set while the edit log monitor is held, so this will
// effectively stall the async processing thread which will cause the
// edit queue to fill up.
final FSEditLogOp blockingOp = Mockito.spy(op);
// doEditTransaction is set while the edit log monitor is held, so this
// will effectively stall the async processing thread which will cause
// the edit queue to fill up.
doAnswer(
new Answer<Void>() {
@Override
@ -641,9 +660,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return null;
}
}
).when(blockingOp).setTransactionId(Mockito.anyLong());
// don't reset fields so instance can be reused.
Mockito.doNothing().when(blockingOp).reset();
).when(editLog).doEditTransaction(argThat(new BlockingOpMatcher()));
// repeatedly overflow the queue and verify it doesn't deadlock.
for (int i = 0; i < 8; i++) {
@ -651,10 +668,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
// spammers to overflow the edit queue, then waits for a permit
// from blockerSemaphore that will be released at the bottom of
// this loop.
Future blockingEdit = executor.submit(new Callable() {
Future<Void> blockingEdit = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log blocker");
final FSEditLogOp blockingOp = getSetOwnerOp(cache, "b");
editLog.logEdit(blockingOp);
editLog.logSync();
return null;
@ -685,7 +703,7 @@ public Boolean get() {
// what log rolling does), unblock the op currently holding the
// monitor, and ensure deadlock does not occur.
CountDownLatch readyLatch = new CountDownLatch(1);
Future synchedEdits = executor.submit(new Callable() {
Future<Void> synchedEdits = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log synchronizer");
@ -693,7 +711,7 @@ public Void call() throws Exception {
// log rolling to deadlock when queue is full.
readyLatch.countDown();
synchronized (editLog) {
editLog.logEdit(reuseOp);
editLog.logEdit(getSetOwnerOp(cache, "g"));
editLog.logSync();
}
return null;

View File

@ -372,4 +372,16 @@ public static long setACStateId(DistributedFileSystem dfs,
lastSeenStateId.accumulate(stateId);
return currentStateId;
}
/**
* Get last seen stateId from the client AlignmentContext.
*/
public static long getLastSeenStateId(DistributedFileSystem dfs)
throws Exception {
ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
dfs.getClient().getNamenode())).getProxyProvider();
ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
return ac.getLastSeenStateId();
}
}

View File

@ -29,13 +29,18 @@
import static org.mockito.Mockito.doAnswer;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -53,12 +58,14 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -489,6 +496,117 @@ public void testFsckDelete() throws Exception {
assertTrue(result.contains("The filesystem under path '/' is CORRUPT"));
}
/**
* The test models the race of two mkdirs RPC calls on the same path to
* Active NameNode. The first arrived call will journal a mkdirs transaction.
* The subsequent call hitting the NameNode before the mkdirs transaction is
* synced will see that the directory already exists, but will obtain
* lastSeenStateId smaller than the txId of the mkdirs transaction
* since the latter hasn't been synced yet.
* This causes stale read from Observer for the second client.
* See HDFS-15915.
*/
@Test
public void testMkdirsRaceWithObserverRead() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2);
// Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec
FSEditLog spyEditLog = NameNodeAdapter.spyDelayMkDirTransaction(
dfsCluster.getNameNode(0), 100);
final int numThreads = 4;
ClientState[] clientStates = new ClientState[numThreads];
final ExecutorService threadPool =
HadoopExecutors.newFixedThreadPool(numThreads);
final Future<?>[] futures = new Future<?>[numThreads];
Configuration conf2 = new Configuration(conf);
// Disable FS cache so that different DFS clients are used
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
for (int i = 0; i < numThreads; i++) {
clientStates[i] = new ClientState();
futures[i] = threadPool.submit(new MkDirRunner(conf2, clientStates[i]));
}
Thread.sleep(150); // wait until mkdir is logged
long activStateId =
dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId();
dfsCluster.rollEditLogAndTail(0);
boolean finished = true;
// wait for all dispatcher threads to finish
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
finished = false;
LOG.warn("MkDirRunner thread failed", e.getCause());
}
}
assertTrue("Not all threads finished", finished);
threadPool.shutdown();
assertEquals("Active and Observer stateIds don't match",
dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(),
dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId());
for (int i = 0; i < numThreads; i++) {
assertTrue("Client #" + i
+ " lastSeenStateId=" + clientStates[i].lastSeenStateId
+ " activStateId=" + activStateId
+ "\n" + clientStates[i].fnfe,
clientStates[i].lastSeenStateId >= activStateId &&
clientStates[i].fnfe == null);
}
// Restore edit log
Mockito.reset(spyEditLog);
}
static class ClientState {
private long lastSeenStateId = -7;
private FileNotFoundException fnfe;
}
static class MkDirRunner implements Runnable {
private static final Path DIR_PATH =
new Path("/TestObserverNode/testMkdirsRaceWithObserverRead");
private DistributedFileSystem fs;
private ClientState clientState;
MkDirRunner(Configuration conf, ClientState cs) throws IOException {
super();
fs = (DistributedFileSystem) FileSystem.get(conf);
clientState = cs;
}
@Override
public void run() {
try {
fs.mkdirs(DIR_PATH);
clientState.lastSeenStateId = HATestUtil.getLastSeenStateId(fs);
assertSentTo(fs, 0);
FileStatus stat = fs.getFileStatus(DIR_PATH);
assertSentTo(fs, 2);
assertTrue("Should be a directory", stat.isDirectory());
} catch (FileNotFoundException ioe) {
clientState.fnfe = ioe;
} catch (Exception e) {
fail("Unexpected exception: " + e);
}
}
}
private static void assertSentTo(DistributedFileSystem fs, int nnIdx)
throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx));
}
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,