HDFS-11160. VolumeScanner reports write-in-progress replicas as corrupt incorrectly. Contributed by Wei-Chiu Chuang and Yongjun Zhang.

(cherry picked from commit 0cb99db9d9)

Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java

(cherry picked from commit 18be0447cde622dfaaad27f7c2b9cccb30469fef)

Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
This commit is contained in:
Wei-Chiu Chuang 2016-12-16 13:40:20 -08:00
parent 2123b664ca
commit bbf380a493
12 changed files with 232 additions and 32 deletions

View File

@ -207,6 +207,9 @@ Release 2.7.4 - UNRELEASED
HDFS-11229. HDFS-11056 failed to close meta file. (Wei-Chiu Chuang)
HDFS-11160. VolumeScanner reports write-in-progress replicas as corrupt
incorrectly. Contributed by Wei-Chiu Chuang and Yongjun Zhang.
Release 2.7.3 - 2016-08-25
INCOMPATIBLE CHANGES

View File

@ -64,7 +64,15 @@ public class BlockScanner {
/**
* The scanner configuration.
*/
private final Conf conf;
private Conf conf;
@VisibleForTesting
void setConf(Conf conf) {
this.conf = conf;
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
entry.getValue().setConf(conf);
}
}
/**
* The cached scanner configuration.

View File

@ -232,13 +232,23 @@ class BlockSender implements java.io.Closeable {
"If verifying checksum, currently must also send it.");
}
// if there is a append write happening right after the BlockSender
// is constructed, the last partial checksum maybe overwritten by the
// append, the BlockSender need to use the partial checksum before
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
synchronized(datanode.data) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
if (replica instanceof FinalizedReplica) {
// Load last checksum in case the replica is being written
// concurrently
final FinalizedReplica frep = (FinalizedReplica) replica;
chunkChecksum = frep.getLastChecksumAndDataLen();
}
}
// if there is a write in progress
ChunkChecksum chunkChecksum = null;
if (replica instanceof ReplicaBeingWritten) {
final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
waitForMinLength(rbw, startOffset + length);
@ -556,7 +566,6 @@ class BlockSender implements java.io.Closeable {
if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) {
System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -101,4 +103,31 @@ public class FinalizedReplica extends ReplicaInfo {
return super.toString()
+ "\n unlinked =" + unlinked;
}
/**
* gets the last chunk checksum and the length of the block corresponding
* to that checksum.
* Note, need to be called with the FsDataset lock acquired. May improve to
* lock only the FsVolume in the future.
* @throws IOException
*/
public ChunkChecksum getLastChecksumAndDataLen() throws IOException {
ChunkChecksum chunkChecksum = null;
try {
byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum(
getBlockFile(), getMetaFile());
if (lastChecksum != null) {
chunkChecksum =
new ChunkChecksum(getVisibleLength(), lastChecksum);
}
} catch (FileNotFoundException e) {
// meta file is lost. Try to continue anyway.
DataNode.LOG.warn("meta file " + getMetaFile() +
" is missing!");
} catch (IOException ioe) {
DataNode.LOG.warn("Unable to read checksum from meta file " +
getMetaFile(), ioe);
}
return chunkChecksum;
}
}

View File

@ -69,7 +69,12 @@ public class VolumeScanner extends Thread {
/**
* The configuration.
*/
private final Conf conf;
private Conf conf;
@VisibleForTesting
void setConf(Conf conf) {
this.conf = conf;
}
/**
* The DataNode this VolumEscanner is associated with.
@ -431,6 +436,7 @@ public class VolumeScanner extends Thread {
if (block == null) {
return -1; // block not found.
}
LOG.debug("start scanning block {}", block);
BlockSender blockSender = null;
try {
blockSender = new BlockSender(block, 0, -1,
@ -612,6 +618,7 @@ public class VolumeScanner extends Thread {
break;
}
if (timeout > 0) {
LOG.debug("{}: wait for {} milliseconds", this, timeout);
wait(timeout);
if (stopping) {
break;

View File

@ -179,4 +179,15 @@ public interface FsVolumeSpi {
* Get the FSDatasetSpi which this volume is a part of.
*/
public FsDatasetSpi getDataset();
/**
* Load last partial chunk checksum from checksum file.
* Need to be called with FsDataset lock acquired.
* @param blockFile
* @param metaFile
* @return the last partial checksum
* @throws IOException
*/
byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
throws IOException;
}

View File

@ -1080,30 +1080,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return new ReplicaHandler(replica, ref);
}
private byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
final int checksumSize = dcs.getChecksumSize();
final long onDiskLen = blockFile.length();
final int bytesPerChecksum = dcs.getBytesPerChecksum();
if (onDiskLen % bytesPerChecksum == 0) {
// the last chunk is a complete one. No need to preserve its checksum
// because it will not be modified.
return null;
}
int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
(int)(onDiskLen / bytesPerChecksum * checksumSize);
byte[] lastChecksum = new byte[checksumSize];
try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
raf.seek(offsetInChecksum);
raf.read(lastChecksum, 0, checksumSize);
}
return lastChecksum;
}
/** Append to a finalized replica
* Change a finalized replica to be a RBW replica and
* bump its generation stamp to be the newGS
@ -1139,7 +1115,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
// load last checksum and datalen
byte[] lastChunkChecksum = loadLastPartialChunkChecksum(
byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum(
replicaInfo.getBlockFile(), replicaInfo.getMetaFile());
newReplicaInfo.setLastChecksumAndDataLen(
replicaInfo.getNumBytes(), lastChunkChecksum);
@ -1470,7 +1446,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// load last checksum and datalen
final File destMeta = FsDatasetUtil.getMetaFile(dest,
b.getGenerationStamp());
byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta);
byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum(dest, destMeta);
rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum);
// overwrite the RBW in the volume map
volumeMap.add(b.getBlockPoolId(), rbw);

View File

@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.ClosedChannelException;
import java.io.OutputStreamWriter;
import java.nio.file.Files;
@ -51,6 +52,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -953,5 +956,41 @@ public class FsVolumeImpl implements FsVolumeSpi {
DatanodeStorage toDatanodeStorage() {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
}
@Override
public byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
// readHeader closes the temporary FileInputStream.
DataChecksum dcs = BlockMetadataHeader
.readHeader(metaFile).getChecksum();
final int checksumSize = dcs.getChecksumSize();
final long onDiskLen = blockFile.length();
final int bytesPerChecksum = dcs.getBytesPerChecksum();
if (onDiskLen % bytesPerChecksum == 0) {
// the last chunk is a complete one. No need to preserve its checksum
// because it will not be modified.
return null;
}
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
(onDiskLen / bytesPerChecksum) * checksumSize;
byte[] lastChecksum = new byte[checksumSize];
try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
raf.seek(offsetInChecksum);
int readBytes = raf.read(lastChecksum, 0, checksumSize);
if (readBytes == -1) {
throw new IOException("Expected to read " + checksumSize +
" bytes from offset " + offsetInChecksum +
" but reached end of file.");
} else if (readBytes != checksumSize) {
throw new IOException("Expected to read " + checksumSize +
" bytes from offset " + offsetInChecksum + " but read " +
readBytes + " bytes.");
}
}
return lastChecksum;
}
}

View File

@ -506,6 +506,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
}
@Override
public byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
return null;
}
}
private final Map<String, Map<Block, BInfo>> blockMap

View File

@ -34,8 +34,12 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -812,4 +816,100 @@ public class TestBlockScanner {
info.blocksScanned = 0;
}
}
/**
* Test concurrent append and scan.
* @throws Exception
*/
@Test(timeout=120000)
public void testAppendWhileScanning() throws Exception {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
Configuration conf = new Configuration();
// throttle the block scanner: 1MB per second
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576);
// Set a really long scan period.
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
final int numExpectedFiles = 1;
final int numExpectedBlocks = 1;
final int numNameServices = 1;
// the initial file length can not be too small.
// Otherwise checksum file stream buffer will be pre-filled and
// BlockSender will not see the updated checksum.
final int initialFileLength = 2*1024*1024+100;
final TestContext ctx = new TestContext(conf, numNameServices);
// create one file, with one block.
ctx.createFiles(0, numExpectedFiles, initialFileLength);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
String storageID = ctx.volumes.get(0).getStorageID();
synchronized (info) {
info.sem = new Semaphore(numExpectedBlocks*2);
info.shouldRun = true;
info.notify();
}
// VolumeScanner scans the first block when DN starts.
// Due to throttler, this should take approximately 2 seconds.
waitForRescan(info, numExpectedBlocks);
// update throttler to schedule rescan immediately.
// this number must be larger than initial file length, otherwise
// throttler prevents immediate rescan.
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
initialFileLength+32*1024);
BlockScanner.Conf newConf = new BlockScanner.Conf(conf);
ctx.datanode.getBlockScanner().setConf(newConf);
// schedule the first block for scanning
ExtendedBlock first = ctx.getFileBlock(0, 0);
ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
// append the file before VolumeScanner completes scanning the block,
// which takes approximately 2 seconds to complete.
FileSystem fs = ctx.cluster.getFileSystem();
FSDataOutputStream os = fs.append(ctx.getPath(0));
long seed = -1;
int size = 200;
final byte[] bytes = AppendTestUtil.randomBytes(seed, size);
os.write(bytes);
os.hflush();
os.close();
fs.close();
// verify that volume scanner does not find bad blocks after append.
waitForRescan(info, numExpectedBlocks);
GenericTestUtils.setLogLevel(DataNode.LOG, Level.INFO);
}
private void waitForRescan(final TestScanResultHandler.Info info,
final int numExpectedBlocks)
throws TimeoutException, InterruptedException {
LOG.info("Waiting for the first 1 blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= numExpectedBlocks) {
LOG.info("info = {}. blockScanned has now reached 1.", info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach 1.", info);
return false;
}
}
}
}, 1000, 30000);
synchronized (info) {
assertEquals("Expected 1 good block.",
numExpectedBlocks, info.goodBlocks.size());
info.goodBlocks.clear();
assertEquals("Expected 1 blocksScanned",
numExpectedBlocks, info.blocksScanned);
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
info.blocksScanned = 0;
}
}
}

View File

@ -613,6 +613,12 @@ public class TestDirectoryScanner {
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
}
@Override
public byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
return null;
}
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

View File

@ -87,6 +87,12 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
return null;
}
@Override
public byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
return null;
}
@Override
public BlockIterator loadBlockIterator(String bpid, String name)
throws IOException {