HDFS-11056. Concurrent append and read operations lead to checksum error. Contributed by Wei-Chiu Chuang.

This commit is contained in:
Wei-Chiu Chuang 2016-11-09 09:15:51 -08:00
parent 367c3d4121
commit c619e9b43f
3 changed files with 126 additions and 0 deletions

View File

@ -23,6 +23,7 @@ import java.io.FileOutputStream;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.net.URI; import java.net.URI;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.file.Files; import java.nio.file.Files;
@ -47,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
@ -1102,6 +1105,28 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
private byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
final int checksumSize = dcs.getChecksumSize();
final long onDiskLen = blockFile.length();
final int bytesPerChecksum = dcs.getBytesPerChecksum();
if (onDiskLen % bytesPerChecksum == 0) {
// the last chunk is a complete one. No need to preserve its checksum
// because it will not be modified.
return null;
}
int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
(int)(onDiskLen / bytesPerChecksum * checksumSize);
byte[] lastChecksum = new byte[checksumSize];
RandomAccessFile raf = new RandomAccessFile(metaFile, "r");
raf.seek(offsetInChecksum);
raf.read(lastChecksum, 0, checksumSize);
return lastChecksum;
}
public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
long newGS, long estimateBlockLen) throws IOException { long newGS, long estimateBlockLen) throws IOException {
@ -1126,6 +1151,13 @@ public class FsVolumeImpl implements FsVolumeSpi {
.setBytesToReserve(bytesReserved) .setBytesToReserve(bytesReserved)
.buildLocalReplicaInPipeline(); .buildLocalReplicaInPipeline();
// load last checksum and datalen
LocalReplica localReplica = (LocalReplica)replicaInfo;
byte[] lastChunkChecksum = loadLastPartialChunkChecksum(
localReplica.getBlockFile(), localReplica.getMetaFile());
newReplicaInfo.setLastChecksumAndDataLen(
replicaInfo.getNumBytes(), lastChunkChecksum);
// rename meta file to rbw directory // rename meta file to rbw directory
// rename block file to rbw directory // rename block file to rbw directory
newReplicaInfo.moveReplicaFrom(replicaInfo, newBlkFile); newReplicaInfo.moveReplicaFrom(replicaInfo, newBlkFile);
@ -1170,6 +1202,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
.setBytesToReserve(0) .setBytesToReserve(0)
.buildLocalReplicaInPipeline(); .buildLocalReplicaInPipeline();
rbw.setBytesAcked(visible); rbw.setBytesAcked(visible);
// load last checksum and datalen
final File destMeta = FsDatasetUtil.getMetaFile(dest,
b.getGenerationStamp());
byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta);
rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum);
return rbw; return rbw;
} }
@ -1206,6 +1244,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
.setDirectoryToUse(blockFile.getParentFile()) .setDirectoryToUse(blockFile.getParentFile())
.setBytesToReserve(newlength) .setBytesToReserve(newlength)
.buildLocalReplicaInPipeline(); .buildLocalReplicaInPipeline();
// In theory, this rbw replica needs to reload last chunk checksum,
// but it is immediately converted to finalized state within the same lock,
// so no need to update it.
return newReplicaInfo; return newReplicaInfo;
} }

View File

@ -24,10 +24,12 @@ import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
@ -44,10 +46,16 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -61,6 +69,8 @@ public class TestFileAppend{
private static byte[] fileContents = null; private static byte[] fileContents = null;
static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
// //
// writes to file but does not close it // writes to file but does not close it
// //
@ -655,4 +665,65 @@ public class TestFileAppend{
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout = 10000)
public void testConcurrentAppendRead()
throws IOException, TimeoutException, InterruptedException {
// Create a finalized replica and append to it
// Read block data and checksum. Verify checksum.
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setInt("dfs.min.replication", 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
// create a file with 1 byte of data.
long initialFileLength = 1;
DistributedFileSystem fs = cluster.getFileSystem();
Path fileName = new Path("/appendCorruptBlock");
DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0);
DFSTestUtil.waitReplication(fs, fileName, (short) 1);
Assert.assertTrue("File not created", fs.exists(fileName));
// Call FsDatasetImpl#append to append the block file,
// which converts it to a rbw replica.
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
long newGS = block.getGenerationStamp()+1;
ReplicaHandler
replicaHandler = dataSet.append(block, newGS, initialFileLength);
// write data to block file
ReplicaBeingWritten rbw =
(ReplicaBeingWritten)replicaHandler.getReplica();
ReplicaOutputStreams
outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM);
OutputStream dataOutput = outputStreams.getDataOut();
byte[] appendBytes = new byte[1];
dataOutput.write(appendBytes, 0, 1);
dataOutput.flush();
dataOutput.close();
// update checksum file
final int smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
FsDatasetUtil.computeChecksum(
rbw.getMetaFile(), rbw.getMetaFile(), rbw.getBlockFile(),
smallBufferSize, conf);
// read the block
// the DataNode BlockSender should read from the rbw replica's in-memory
// checksum, rather than on-disk checksum. Otherwise it will see a
// checksum mismatch error.
final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName);
assertEquals("should have read only one byte!", 1, readBlock.length);
} finally {
cluster.shutdown();
}
}
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.DF;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@ -43,10 +44,13 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
@ -67,6 +71,8 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
LogFactory.getLog(FsDatasetImplTestUtils.class); LogFactory.getLog(FsDatasetImplTestUtils.class);
private final FsDatasetImpl dataset; private final FsDatasetImpl dataset;
private static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
/** /**
* By default we assume 2 data directories (volumes) per DataNode. * By default we assume 2 data directories (volumes) per DataNode.
*/ */
@ -245,9 +251,17 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
dataset.volumeMap.add(block.getBlockPoolId(), info); dataset.volumeMap.add(block.getBlockPoolId(), info);
info.getBlockFile().createNewFile(); info.getBlockFile().createNewFile();
info.getMetaFile().createNewFile(); info.getMetaFile().createNewFile();
saveMetaFileHeader(info.getMetaFile());
return info; return info;
} }
private void saveMetaFileHeader(File metaFile) throws IOException {
DataOutputStream metaOut = new DataOutputStream(
new FileOutputStream(metaFile));
BlockMetadataHeader.writeHeader(metaOut, DEFAULT_CHECKSUM);
metaOut.close();
}
@Override @Override
public Replica createReplicaInPipeline(ExtendedBlock block) public Replica createReplicaInPipeline(ExtendedBlock block)
throws IOException { throws IOException {