HDFS-7203. Concurrent appending to the same file can cause data

corruption. Contributed by Kihwal Lee.
(cherry picked from commit 853cb704ed)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
Kihwal Lee 2014-10-08 15:10:03 -05:00
parent 088ae9c5bf
commit 522b6505f4
3 changed files with 91 additions and 9 deletions

View File

@ -542,6 +542,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7172. Test data files may be checked out of git with incorrect line HDFS-7172. Test data files may be checked out of git with incorrect line
endings, causing test failures in TestHDFSCLI. (Chris Nauroth via wheat9) endings, causing test failures in TestHDFSCLI. (Chris Nauroth via wheat9)
HDFS-7203. Concurrent appending to the same file can cause data corruption
(kihwal)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -1651,7 +1651,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
return null; return null;
} }
return callAppend(stat, src, buffersize, progress); return callAppend(src, buffersize, progress);
} }
return null; return null;
} }
@ -1723,7 +1723,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
/** Method to get stream returned by append call */ /** Method to get stream returned by append call */
private DFSOutputStream callAppend(HdfsFileStatus stat, String src, private DFSOutputStream callAppend(String src,
int buffersize, Progressable progress) throws IOException { int buffersize, Progressable progress) throws IOException {
LocatedBlock lastBlock = null; LocatedBlock lastBlock = null;
try { try {
@ -1737,8 +1737,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
UnresolvedPathException.class, UnresolvedPathException.class,
SnapshotAccessControlException.class); SnapshotAccessControlException.class);
} }
HdfsFileStatus newStat = getFileInfo(src);
return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress, return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
lastBlock, stat, dfsClientConf.createChecksum()); lastBlock, newStat, dfsClientConf.createChecksum());
} }
/** /**
@ -1762,12 +1763,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private DFSOutputStream append(String src, int buffersize, Progressable progress) private DFSOutputStream append(String src, int buffersize, Progressable progress)
throws IOException { throws IOException {
checkOpen(); checkOpen();
HdfsFileStatus stat = getFileInfo(src); final DFSOutputStream result = callAppend(src, buffersize, progress);
if (stat == null) { // No file found
throw new FileNotFoundException("failed to append to non-existent file "
+ src + " on client " + clientName);
}
final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
beginFileLease(result.getFileId(), result); beginFileLease(result.getFileId(), result);
return result; return result;
} }

View File

@ -25,15 +25,23 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import org.mockito.invocation.InvocationOnMock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.mockito.stubbing.Answer;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -361,4 +369,79 @@ public class TestFileAppend3 {
AppendTestUtil.checkFullFile(fs, p, fileLen, AppendTestUtil.checkFullFile(fs, p, fileLen,
fileContents, "Failed to append to a partial chunk"); fileContents, "Failed to append to a partial chunk");
} }
// Do small appends.
void doSmallAppends(Path file, DistributedFileSystem fs, int iterations)
throws IOException {
for (int i = 0; i < iterations; i++) {
FSDataOutputStream stm;
try {
stm = fs.append(file);
} catch (IOException e) {
// If another thread is already appending, skip this time.
continue;
}
// Failure in write or close will be terminal.
AppendTestUtil.write(stm, 0, 123);
stm.close();
}
}
@Test
public void testSmallAppendRace() throws Exception {
final Path file = new Path("/testSmallAppendRace");
final String fName = file.toUri().getPath();
// Create the file and write a small amount of data.
FSDataOutputStream stm = fs.create(file);
AppendTestUtil.write(stm, 0, 123);
stm.close();
// Introduce a delay between getFileInfo and calling append() against NN.
final DFSClient client = DFSClientAdapter.getDFSClient(fs);
DFSClient spyClient = spy(client);
when(spyClient.getFileInfo(fName)).thenAnswer(new Answer<HdfsFileStatus>() {
@Override
public HdfsFileStatus answer(InvocationOnMock invocation){
try {
HdfsFileStatus stat = client.getFileInfo(fName);
Thread.sleep(100);
return stat;
} catch (Exception e) {
return null;
}
}
});
DFSClientAdapter.setDFSClient(fs, spyClient);
// Create two threads for doing appends to the same file.
Thread worker1 = new Thread() {
@Override
public void run() {
try {
doSmallAppends(file, fs, 20);
} catch (IOException e) {
}
}
};
Thread worker2 = new Thread() {
@Override
public void run() {
try {
doSmallAppends(file, fs, 20);
} catch (IOException e) {
}
}
};
worker1.start();
worker2.start();
// append will fail when the file size crosses the checksum chunk boundary,
// if append was called with a stale file stat.
doSmallAppends(file, fs, 20);
}
} }