HDFS-11056. Concurrent append and read operations lead to checksum error. Contributed by Wei-Chiu Chuang.

This commit is contained in:
Wei-Chiu Chuang 2016-11-14 11:57:51 -08:00
parent c997fc629d
commit 81bf6f2834
5 changed files with 141 additions and 3 deletions

View File

@ -188,6 +188,9 @@ Release 2.7.4 - UNRELEASED
HDFS-9500. Fix software version counts for DataNodes during rolling upgrade. HDFS-9500. Fix software version counts for DataNodes during rolling upgrade.
(Erik Krogen via shv) (Erik Krogen via shv)
HDFS-11056 Concurrent append and read operations lead to checksum error.
(Wei-Chiu Chuang)
Release 2.7.3 - 2016-08-25 Release 2.7.3 - 2016-08-25
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -942,7 +942,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @param blockFile block file for which the checksum will be computed * @param blockFile block file for which the checksum will be computed
* @throws IOException * @throws IOException
*/ */
private static void computeChecksum(File srcMeta, File dstMeta, File blockFile) static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
throws IOException { throws IOException {
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta); final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
final byte[] data = new byte[1 << 16]; final byte[] data = new byte[1 << 16];
@ -1079,7 +1079,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
return new ReplicaHandler(replica, ref); return new ReplicaHandler(replica, ref);
} }
private byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
final int checksumSize = dcs.getChecksumSize();
final long onDiskLen = blockFile.length();
final int bytesPerChecksum = dcs.getBytesPerChecksum();
if (onDiskLen % bytesPerChecksum == 0) {
// the last chunk is a complete one. No need to preserve its checksum
// because it will not be modified.
return null;
}
int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
(int)(onDiskLen / bytesPerChecksum * checksumSize);
byte[] lastChecksum = new byte[checksumSize];
RandomAccessFile raf = new RandomAccessFile(metaFile, "r");
raf.seek(offsetInChecksum);
raf.read(lastChecksum, 0, checksumSize);
return lastChecksum;
}
/** Append to a finalized replica /** Append to a finalized replica
* Change a finalized replica to be a RBW replica and * Change a finalized replica to be a RBW replica and
* bump its generation stamp to be the newGS * bump its generation stamp to be the newGS
@ -1113,6 +1136,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
// load last checksum and datalen
byte[] lastChunkChecksum = loadLastPartialChunkChecksum(
replicaInfo.getBlockFile(), replicaInfo.getMetaFile());
newReplicaInfo.setLastChecksumAndDataLen(
replicaInfo.getNumBytes(), lastChunkChecksum);
File newmeta = newReplicaInfo.getMetaFile(); File newmeta = newReplicaInfo.getMetaFile();
// rename meta file to rbw directory // rename meta file to rbw directory
@ -1435,6 +1465,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
blockId, numBytes, expectedGs, blockId, numBytes, expectedGs,
v, dest.getParentFile(), Thread.currentThread(), 0); v, dest.getParentFile(), Thread.currentThread(), 0);
rbw.setBytesAcked(visible); rbw.setBytesAcked(visible);
// load last checksum and datalen
final File destMeta = FsDatasetUtil.getMetaFile(dest,
b.getGenerationStamp());
byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta);
rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum);
// overwrite the RBW in the volume map // overwrite the RBW in the volume map
volumeMap.add(b.getBlockPoolId(), rbw); volumeMap.add(b.getBlockPoolId(), rbw);
return rbw; return rbw;
@ -2466,6 +2502,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newBlockId, recoveryId, volume, blockFile.getParentFile(), newBlockId, recoveryId, volume, blockFile.getParentFile(),
newlength); newlength);
newReplicaInfo.setNumBytes(newlength); newReplicaInfo.setNumBytes(newlength);
// In theory, this rbw replica needs to reload last chunk checksum,
// but it is immediately converted to finalized state within the same
// lock, so no need to update it.
volumeMap.add(bpid, newReplicaInfo); volumeMap.add(bpid, newReplicaInfo);
finalizeReplica(bpid, newReplicaInfo); finalizeReplica(bpid, newReplicaInfo);
} }

View File

@ -22,6 +22,7 @@ import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@ -104,4 +105,16 @@ public class FsDatasetUtil {
+ blockFile + ", metaFile=" + metaFile, nfe); + blockFile + ", metaFile=" + metaFile, nfe);
} }
} }
/**
* Compute the checksum for a block file that does not already have
* its checksum computed, and save it to dstMeta file.
*/
public static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
throws IOException {
Preconditions.checkNotNull(srcMeta);
Preconditions.checkNotNull(dstMeta);
Preconditions.checkNotNull(blockFile);
FsDatasetImpl.computeChecksum(srcMeta, dstMeta, blockFile);
}
} }

View File

@ -24,9 +24,11 @@ import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
@ -42,9 +44,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.DataChecksum;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -57,6 +65,8 @@ public class TestFileAppend{
private static byte[] fileContents = null; private static byte[] fileContents = null;
static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
// //
// writes to file but does not close it // writes to file but does not close it
// //
@ -603,4 +613,62 @@ public class TestFileAppend{
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout = 10000)
public void testConcurrentAppendRead()
throws IOException, TimeoutException, InterruptedException {
// Create a finalized replica and append to it
// Read block data and checksum. Verify checksum.
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setInt("dfs.min.replication", 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
// create a file with 1 byte of data.
long initialFileLength = 1;
DistributedFileSystem fs = cluster.getFileSystem();
Path fileName = new Path("/appendCorruptBlock");
DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0);
DFSTestUtil.waitReplication(fs, fileName, (short) 1);
Assert.assertTrue("File not created", fs.exists(fileName));
// Call FsDatasetImpl#append to append the block file,
// which converts it to a rbw replica.
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
long newGS = block.getGenerationStamp() + 1;
ReplicaHandler replicaHandler =
dataSet.append(block, newGS, initialFileLength);
// write data to block file
ReplicaBeingWritten rbw =
(ReplicaBeingWritten) replicaHandler.getReplica();
ReplicaOutputStreams outputStreams =
rbw.createStreams(false, DEFAULT_CHECKSUM);
OutputStream dataOutput = outputStreams.getDataOut();
byte[] appendBytes = new byte[1];
dataOutput.write(appendBytes, 0, 1);
dataOutput.flush();
dataOutput.close();
// update checksum file
FsDatasetUtil.computeChecksum(rbw.getMetaFile(), rbw.getMetaFile(),
rbw.getBlockFile());
// read the block
// the DataNode BlockSender should read from the rbw replica's in-memory
// checksum, rather than on-disk checksum. Otherwise it will see a
// checksum mismatch error.
final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName);
assertEquals("should have read only one byte!", 1, readBlock.length);
} finally {
cluster.shutdown();
}
}
} }

View File

@ -17,12 +17,16 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
@ -34,6 +38,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -47,7 +52,9 @@ public class TestWriteToReplica {
final private static int RWR = 3; final private static int RWR = 3;
final private static int RUR = 4; final private static int RUR = 4;
final private static int NON_EXISTENT = 5; final private static int NON_EXISTENT = 5;
private static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
// test close // test close
@Test @Test
public void testClose() throws Exception { public void testClose() throws Exception {
@ -129,6 +136,13 @@ public class TestWriteToReplica {
cluster.shutdown(); cluster.shutdown();
} }
} }
private void saveMetaFileHeader(File metaFile) throws IOException {
DataOutputStream metaOut = new DataOutputStream(
new FileOutputStream(metaFile));
BlockMetadataHeader.writeHeader(metaOut, DEFAULT_CHECKSUM);
metaOut.close();
}
/** /**
* Generate testing environment and return a collection of blocks * Generate testing environment and return a collection of blocks
@ -156,6 +170,7 @@ public class TestWriteToReplica {
replicasMap.add(bpid, replicaInfo); replicasMap.add(bpid, replicaInfo);
replicaInfo.getBlockFile().createNewFile(); replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile(); replicaInfo.getMetaFile().createNewFile();
saveMetaFileHeader(replicaInfo.getMetaFile());
replicasMap.add(bpid, new ReplicaInPipeline( replicasMap.add(bpid, new ReplicaInPipeline(
blocks[TEMPORARY].getBlockId(), blocks[TEMPORARY].getBlockId(),