HDFS-7203. Concurrent appending to the same file can cause data

corruption. Contributed by Kihwal Lee.
This commit is contained in:
Kihwal Lee 2014-10-08 15:05:13 -05:00
parent d218ab58fc
commit b39c8c3128
3 changed files with 91 additions and 9 deletions

View File

@ -564,6 +564,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7181. Remove incorrect precondition check on key length in HDFS-7181. Remove incorrect precondition check on key length in
FileEncryptionInfo. (wang) FileEncryptionInfo. (wang)
HDFS-7203. Concurrent appending to the same file can cause data corruption
(kihwal)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -1666,7 +1666,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
return null; return null;
} }
return callAppend(stat, src, buffersize, progress); return callAppend(src, buffersize, progress);
} }
return null; return null;
} }
@ -1738,7 +1738,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
/** Method to get stream returned by append call */ /** Method to get stream returned by append call */
private DFSOutputStream callAppend(HdfsFileStatus stat, String src, private DFSOutputStream callAppend(String src,
int buffersize, Progressable progress) throws IOException { int buffersize, Progressable progress) throws IOException {
LocatedBlock lastBlock = null; LocatedBlock lastBlock = null;
try { try {
@ -1752,8 +1752,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
UnresolvedPathException.class, UnresolvedPathException.class,
SnapshotAccessControlException.class); SnapshotAccessControlException.class);
} }
HdfsFileStatus newStat = getFileInfo(src);
return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress, return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
lastBlock, stat, dfsClientConf.createChecksum()); lastBlock, newStat, dfsClientConf.createChecksum());
} }
/** /**
@ -1777,12 +1778,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private DFSOutputStream append(String src, int buffersize, Progressable progress) private DFSOutputStream append(String src, int buffersize, Progressable progress)
throws IOException { throws IOException {
checkOpen(); checkOpen();
HdfsFileStatus stat = getFileInfo(src); final DFSOutputStream result = callAppend(src, buffersize, progress);
if (stat == null) { // No file found
throw new FileNotFoundException("failed to append to non-existent file "
+ src + " on client " + clientName);
}
final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
beginFileLease(result.getFileId(), result); beginFileLease(result.getFileId(), result);
return result; return result;
} }

View File

@ -25,15 +25,23 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import org.mockito.invocation.InvocationOnMock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.mockito.stubbing.Answer;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -361,4 +369,79 @@ public class TestFileAppend3 {
AppendTestUtil.checkFullFile(fs, p, fileLen, AppendTestUtil.checkFullFile(fs, p, fileLen,
fileContents, "Failed to append to a partial chunk"); fileContents, "Failed to append to a partial chunk");
} }
// Do small appends.
void doSmallAppends(Path file, DistributedFileSystem fs, int iterations)
throws IOException {
for (int i = 0; i < iterations; i++) {
FSDataOutputStream stm;
try {
stm = fs.append(file);
} catch (IOException e) {
// If another thread is already appending, skip this time.
continue;
}
// Failure in write or close will be terminal.
AppendTestUtil.write(stm, 0, 123);
stm.close();
}
}
@Test
public void testSmallAppendRace() throws Exception {
final Path file = new Path("/testSmallAppendRace");
final String fName = file.toUri().getPath();
// Create the file and write a small amount of data.
FSDataOutputStream stm = fs.create(file);
AppendTestUtil.write(stm, 0, 123);
stm.close();
// Introduce a delay between getFileInfo and calling append() against NN.
final DFSClient client = DFSClientAdapter.getDFSClient(fs);
DFSClient spyClient = spy(client);
when(spyClient.getFileInfo(fName)).thenAnswer(new Answer<HdfsFileStatus>() {
@Override
public HdfsFileStatus answer(InvocationOnMock invocation){
try {
HdfsFileStatus stat = client.getFileInfo(fName);
Thread.sleep(100);
return stat;
} catch (Exception e) {
return null;
}
}
});
DFSClientAdapter.setDFSClient(fs, spyClient);
// Create two threads for doing appends to the same file.
Thread worker1 = new Thread() {
@Override
public void run() {
try {
doSmallAppends(file, fs, 20);
} catch (IOException e) {
}
}
};
Thread worker2 = new Thread() {
@Override
public void run() {
try {
doSmallAppends(file, fs, 20);
} catch (IOException e) {
}
}
};
worker1.start();
worker2.start();
// append will fail when the file size crosses the checksum chunk boundary,
// if append was called with a stale file stat.
doSmallAppends(file, fs, 20);
}
} }