From aebb9127bae872835d057e1c6a6e6b3c6a8be6cd Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Thu, 15 Dec 2016 16:32:50 -0800 Subject: [PATCH] HDFS-11160. VolumeScanner reports write-in-progress replicas as corrupt incorrectly. Contributed by Wei-Chiu Chuang and Yongjun Zhang. --- .../hdfs/server/datanode/BlockScanner.java | 10 +- .../hdfs/server/datanode/BlockSender.java | 18 +++- .../server/datanode/FinalizedReplica.java | 29 +++++ .../hdfs/server/datanode/VolumeScanner.java | 9 +- .../datanode/fsdataset/FsVolumeSpi.java | 11 ++ .../datanode/fsdataset/impl/FsVolumeImpl.java | 18 +++- .../server/datanode/SimulatedFSDataset.java | 6 ++ .../server/datanode/TestBlockScanner.java | 100 ++++++++++++++++++ .../server/datanode/TestDirectoryScanner.java | 6 ++ .../extdataset/ExternalVolumeImpl.java | 7 ++ 10 files changed, 203 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java index 21484fbc31f..80818957a89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -64,7 +64,15 @@ public class BlockScanner { /** * The scanner configuration. */ - private final Conf conf; + private Conf conf; + + @VisibleForTesting + void setConf(Conf conf) { + this.conf = conf; + for (Entry entry : scanners.entrySet()) { + entry.getValue().setConf(conf); + } + } /** * The cached scanner configuration. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 99597dc71db..203ee357897 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -240,14 +240,23 @@ class BlockSender implements java.io.Closeable { Preconditions.checkArgument(sendChecksum, "If verifying checksum, currently must also send it."); } - + + // if there is a append write happening right after the BlockSender + // is constructed, the last partial checksum maybe overwritten by the + // append, the BlockSender need to use the partial checksum before + // the append write. + ChunkChecksum chunkChecksum = null; final long replicaVisibleLength; try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); + if (replica instanceof FinalizedReplica) { + // Load last checksum in case the replica is being written + // concurrently + final FinalizedReplica frep = (FinalizedReplica) replica; + chunkChecksum = frep.getLastChecksumAndDataLen(); + } } - // if there is a write in progress - ChunkChecksum chunkChecksum = null; if (replica.getState() == ReplicaState.RBW) { final ReplicaInPipeline rbw = (ReplicaInPipeline) replica; waitForMinLength(rbw, startOffset + length); @@ -473,7 +482,7 @@ private static void waitForMinLength(ReplicaInPipeline rbw, long len) bytesOnDisk)); } } - + /** * Converts an IOExcpetion (not subclasses) to SocketException. * This is typically done to indicate to upper layers that the error @@ -547,7 +556,6 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, if (lastDataPacket && lastChunkChecksum != null) { int start = checksumOff + checksumDataLen - checksumSize; byte[] updatedChecksum = lastChunkChecksum.getChecksum(); - if (updatedChecksum != null) { System.arraycopy(updatedChecksum, 0, buf, start, checksumSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index 81a4ab4a4ad..e3e045062de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -113,4 +115,31 @@ public ReplicaRecoveryInfo createInfo() { throw new UnsupportedOperationException("Replica of type " + getState() + " does not support createInfo"); } + + /** + * gets the last chunk checksum and the length of the block corresponding + * to that checksum. + * Note, need to be called with the FsDataset lock acquired. May improve to + * lock only the FsVolume in the future. + * @throws IOException + */ + public ChunkChecksum getLastChecksumAndDataLen() throws IOException { + ChunkChecksum chunkChecksum = null; + try { + byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( + getBlockFile(), getMetaFile()); + if (lastChecksum != null) { + chunkChecksum = + new ChunkChecksum(getVisibleLength(), lastChecksum); + } + } catch (FileNotFoundException e) { + // meta file is lost. Try to continue anyway. + DataNode.LOG.warn("meta file " + getMetaFile() + + " is missing!"); + } catch (IOException ioe) { + DataNode.LOG.warn("Unable to read checksum from meta file " + + getMetaFile(), ioe); + } + return chunkChecksum; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index 1e44fb66be7..8b29fcebd66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -69,7 +69,12 @@ public class VolumeScanner extends Thread { /** * The configuration. */ - private final Conf conf; + private Conf conf; + + @VisibleForTesting + void setConf(Conf conf) { + this.conf = conf; + } /** * The DataNode this VolumEscanner is associated with. @@ -429,6 +434,7 @@ private long scanBlock(ExtendedBlock cblock, long bytesPerSec) { if (block == null) { return -1; // block not found. } + LOG.debug("start scanning block {}", block); BlockSender blockSender = null; try { blockSender = new BlockSender(block, 0, -1, @@ -610,6 +616,7 @@ public void run() { break; } if (timeout > 0) { + LOG.debug("{}: wait for {} milliseconds", this, timeout); wait(timeout); if (stopping) { break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index 4947ecfe969..8aa2fd99b7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -401,6 +401,17 @@ public long getGenStamp() { } } + /** + * Load last partial chunk checksum from checksum file. + * Need to be called with FsDataset lock acquired. + * @param blockFile + * @param metaFile + * @return the last partial checksum + * @throws IOException + */ + byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile) + throws IOException; + /** * Compile a list of {@link ScanInfo} for the blocks in * the block pool with id {@code bpid}. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 74ee063daa5..e1bc8865a2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -1119,7 +1119,8 @@ DatanodeStorage toDatanodeStorage() { } - private byte[] loadLastPartialChunkChecksum( + @Override + public byte[] loadLastPartialChunkChecksum( File blockFile, File metaFile) throws IOException { // readHeader closes the temporary FileInputStream. DataChecksum dcs = BlockMetadataHeader @@ -1135,13 +1136,22 @@ private byte[] loadLastPartialChunkChecksum( return null; } - int offsetInChecksum = BlockMetadataHeader.getHeaderSize() + - (int)(onDiskLen / bytesPerChecksum * checksumSize); + long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + + (onDiskLen / bytesPerChecksum) * checksumSize; byte[] lastChecksum = new byte[checksumSize]; try (RandomAccessFile raf = fileIoProvider.getRandomAccessFile( this, metaFile, "r")) { raf.seek(offsetInChecksum); - raf.read(lastChecksum, 0, checksumSize); + int readBytes = raf.read(lastChecksum, 0, checksumSize); + if (readBytes == -1) { + throw new IOException("Expected to read " + checksumSize + + " bytes from offset " + offsetInChecksum + + " but reached end of file."); + } else if (readBytes != checksumSize) { + throw new IOException("Expected to read " + checksumSize + + " bytes from offset " + offsetInChecksum + " but read " + + readBytes + " bytes."); + } } return lastChecksum; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index a0041dde78c..d3efe483695 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -556,6 +556,12 @@ public DF getUsageStats(Configuration conf) { return null; } + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + return null; + } + @Override public LinkedList compileReport(String bpid, LinkedList report, ReportCompiler reportCompiler) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java index 6d35cc54f9e..b6278003917 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -36,8 +36,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; import com.google.common.base.Supplier; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica; @@ -870,4 +874,100 @@ public Boolean get() { } info.sem.release(1); } + + /** + * Test concurrent append and scan. + * @throws Exception + */ + @Test(timeout=120000) + public void testAppendWhileScanning() throws Exception { + GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); + Configuration conf = new Configuration(); + // throttle the block scanner: 1MB per second + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576); + // Set a really long scan period. + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L); + final int numExpectedFiles = 1; + final int numExpectedBlocks = 1; + final int numNameServices = 1; + // the initial file length can not be too small. + // Otherwise checksum file stream buffer will be pre-filled and + // BlockSender will not see the updated checksum. + final int initialFileLength = 2*1024*1024+100; + final TestContext ctx = new TestContext(conf, numNameServices); + // create one file, with one block. + ctx.createFiles(0, numExpectedFiles, initialFileLength); + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + String storageID = ctx.volumes.get(0).getStorageID(); + synchronized (info) { + info.sem = new Semaphore(numExpectedBlocks*2); + info.shouldRun = true; + info.notify(); + } + // VolumeScanner scans the first block when DN starts. + // Due to throttler, this should take approximately 2 seconds. + waitForRescan(info, numExpectedBlocks); + + // update throttler to schedule rescan immediately. + // this number must be larger than initial file length, otherwise + // throttler prevents immediate rescan. + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, + initialFileLength+32*1024); + BlockScanner.Conf newConf = new BlockScanner.Conf(conf); + ctx.datanode.getBlockScanner().setConf(newConf); + // schedule the first block for scanning + ExtendedBlock first = ctx.getFileBlock(0, 0); + ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first); + + // append the file before VolumeScanner completes scanning the block, + // which takes approximately 2 seconds to complete. + FileSystem fs = ctx.cluster.getFileSystem(); + FSDataOutputStream os = fs.append(ctx.getPath(0)); + long seed = -1; + int size = 200; + final byte[] bytes = AppendTestUtil.randomBytes(seed, size); + os.write(bytes); + os.hflush(); + os.close(); + fs.close(); + + // verify that volume scanner does not find bad blocks after append. + waitForRescan(info, numExpectedBlocks); + + GenericTestUtils.setLogLevel(DataNode.LOG, Level.INFO); + } + + private void waitForRescan(final TestScanResultHandler.Info info, + final int numExpectedBlocks) + throws TimeoutException, InterruptedException { + LOG.info("Waiting for the first 1 blocks to be scanned."); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + synchronized (info) { + if (info.blocksScanned >= numExpectedBlocks) { + LOG.info("info = {}. blockScanned has now reached 1.", info); + return true; + } else { + LOG.info("info = {}. Waiting for blockScanned to reach 1.", info); + return false; + } + } + } + }, 1000, 30000); + + synchronized (info) { + assertEquals("Expected 1 good block.", + numExpectedBlocks, info.goodBlocks.size()); + info.goodBlocks.clear(); + assertEquals("Expected 1 blocksScanned", + numExpectedBlocks, info.blocksScanned); + assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size()); + info.blocksScanned = 0; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index cc0915d624e..9b0aa82aa9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -898,6 +898,12 @@ public DF getUsageStats(Configuration conf) { return null; } + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + return null; + } + @Override public LinkedList compileReport(String bpid, LinkedList report, ReportCompiler reportCompiler) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index e607de519c5..2d33e20bcf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.extdataset; +import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.channels.ClosedChannelException; @@ -108,6 +109,12 @@ public DF getUsageStats(Configuration conf) { return null; } + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + return null; + } + @Override public LinkedList compileReport(String bpid, LinkedList report, ReportCompiler reportCompiler)