diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4c502fae9a7..c254992f495 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -564,6 +564,9 @@ Release 2.6.0 - UNRELEASED HDFS-7181. Remove incorrect precondition check on key length in FileEncryptionInfo. (wang) + HDFS-7203. Concurrent appending to the same file can cause data corruption + (kihwal) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index f54d325b71f..362c62de92c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1666,7 +1666,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } return null; } - return callAppend(stat, src, buffersize, progress); + return callAppend(src, buffersize, progress); } return null; } @@ -1738,7 +1738,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } /** Method to get stream returned by append call */ - private DFSOutputStream callAppend(HdfsFileStatus stat, String src, + private DFSOutputStream callAppend(String src, int buffersize, Progressable progress) throws IOException { LocatedBlock lastBlock = null; try { @@ -1752,8 +1752,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, UnresolvedPathException.class, SnapshotAccessControlException.class); } + HdfsFileStatus newStat = getFileInfo(src); return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress, - lastBlock, stat, dfsClientConf.createChecksum()); + lastBlock, newStat, dfsClientConf.createChecksum()); } /** @@ -1777,12 +1778,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private DFSOutputStream append(String src, int buffersize, Progressable progress) throws IOException { checkOpen(); - HdfsFileStatus stat = getFileInfo(src); - if (stat == null) { // No file found - throw new FileNotFoundException("failed to append to non-existent file " - + src + " on client " + clientName); - } - final DFSOutputStream result = callAppend(stat, src, buffersize, progress); + final DFSOutputStream result = callAppend(src, buffersize, progress); beginFileLease(result.getFileId(), result); return result; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java index 66a04e78279..d5de0ff4d09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java @@ -25,15 +25,23 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import org.mockito.invocation.InvocationOnMock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import org.mockito.stubbing.Answer; + import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSClientAdapter; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -361,4 +369,79 @@ public class TestFileAppend3 { AppendTestUtil.checkFullFile(fs, p, fileLen, fileContents, "Failed to append to a partial chunk"); } + + // Do small appends. + void doSmallAppends(Path file, DistributedFileSystem fs, int iterations) + throws IOException { + for (int i = 0; i < iterations; i++) { + FSDataOutputStream stm; + try { + stm = fs.append(file); + } catch (IOException e) { + // If another thread is already appending, skip this time. + continue; + } + // Failure in write or close will be terminal. + AppendTestUtil.write(stm, 0, 123); + stm.close(); + } + } + + + @Test + public void testSmallAppendRace() throws Exception { + final Path file = new Path("/testSmallAppendRace"); + final String fName = file.toUri().getPath(); + + // Create the file and write a small amount of data. + FSDataOutputStream stm = fs.create(file); + AppendTestUtil.write(stm, 0, 123); + stm.close(); + + // Introduce a delay between getFileInfo and calling append() against NN. + final DFSClient client = DFSClientAdapter.getDFSClient(fs); + DFSClient spyClient = spy(client); + when(spyClient.getFileInfo(fName)).thenAnswer(new Answer() { + @Override + public HdfsFileStatus answer(InvocationOnMock invocation){ + try { + HdfsFileStatus stat = client.getFileInfo(fName); + Thread.sleep(100); + return stat; + } catch (Exception e) { + return null; + } + } + }); + + DFSClientAdapter.setDFSClient(fs, spyClient); + + // Create two threads for doing appends to the same file. + Thread worker1 = new Thread() { + @Override + public void run() { + try { + doSmallAppends(file, fs, 20); + } catch (IOException e) { + } + } + }; + + Thread worker2 = new Thread() { + @Override + public void run() { + try { + doSmallAppends(file, fs, 20); + } catch (IOException e) { + } + } + }; + + worker1.start(); + worker2.start(); + + // append will fail when the file size crosses the checksum chunk boundary, + // if append was called with a stale file stat. + doSmallAppends(file, fs, 20); + } }