diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java index 456dcc1589f..3d97022ac5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -64,7 +64,15 @@ public class BlockScanner { /** * The scanner configuration. */ - private final Conf conf; + private Conf conf; + + @VisibleForTesting + void setConf(Conf conf) { + this.conf = conf; + for (Entry entry : scanners.entrySet()) { + entry.getValue().setConf(conf); + } + } /** * The cached scanner configuration. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 4d3a45a2a15..aeeef97d09c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -242,14 +242,24 @@ class BlockSender implements java.io.Closeable { Preconditions.checkArgument(sendChecksum, "If verifying checksum, currently must also send it."); } - + + // if there is a append write happening right after the BlockSender + // is constructed, the last partial checksum maybe overwritten by the + // append, the BlockSender need to use the partial checksum before + // the append write. + ChunkChecksum chunkChecksum = null; final long replicaVisibleLength; try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); + if (replica instanceof FinalizedReplica) { + // Load last checksum in case the replica is being written + // concurrently + final FinalizedReplica frep = (FinalizedReplica) replica; + chunkChecksum = frep.getLastChecksumAndDataLen(); + } } // if there is a write in progress - ChunkChecksum chunkChecksum = null; if (replica instanceof ReplicaBeingWritten) { final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica; waitForMinLength(rbw, startOffset + length); @@ -498,7 +508,7 @@ private static void waitForMinLength(ReplicaBeingWritten rbw, long len) bytesOnDisk)); } } - + /** * Converts an IOExcpetion (not subclasses) to SocketException. * This is typically done to indicate to upper layers that the error @@ -572,7 +582,6 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, if (lastDataPacket && lastChunkChecksum != null) { int start = checksumOff + checksumDataLen - checksumSize; byte[] updatedChecksum = lastChunkChecksum.getChecksum(); - if (updatedChecksum != null) { System.arraycopy(updatedChecksum, 0, buf, start, checksumSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index 8daeb51e0df..da211913b3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -88,4 +90,31 @@ public int hashCode() { public String toString() { return super.toString(); } + + /** + * gets the last chunk checksum and the length of the block corresponding + * to that checksum. + * Note, need to be called with the FsDataset lock acquired. May improve to + * lock only the FsVolume in the future. + * @throws IOException + */ + public ChunkChecksum getLastChecksumAndDataLen() throws IOException { + ChunkChecksum chunkChecksum = null; + try { + byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( + getBlockFile(), getMetaFile()); + if (lastChecksum != null) { + chunkChecksum = + new ChunkChecksum(getVisibleLength(), lastChecksum); + } + } catch (FileNotFoundException e) { + // meta file is lost. Try to continue anyway. + DataNode.LOG.warn("meta file " + getMetaFile() + + " is missing!"); + } catch (IOException ioe) { + DataNode.LOG.warn("Unable to read checksum from meta file " + + getMetaFile(), ioe); + } + return chunkChecksum; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index 3416b53011c..05a75d5f2c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -69,7 +69,12 @@ public class VolumeScanner extends Thread { /** * The configuration. */ - private final Conf conf; + private Conf conf; + + @VisibleForTesting + void setConf(Conf conf) { + this.conf = conf; + } /** * The DataNode this VolumEscanner is associated with. @@ -430,6 +435,7 @@ private long scanBlock(ExtendedBlock cblock, long bytesPerSec) { if (block == null) { return -1; // block not found. } + LOG.debug("start scanning block {}", block); BlockSender blockSender = null; try { blockSender = new BlockSender(block, 0, -1, @@ -611,6 +617,7 @@ public void run() { break; } if (timeout > 0) { + LOG.debug("{}: wait for {} milliseconds", this, timeout); wait(timeout); if (stopping) { break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index 9e161214c86..d74fe09ba2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -186,4 +186,15 @@ interface BlockIterator extends Closeable { * Get the FSDatasetSpi which this volume is a part of. */ FsDatasetSpi getDataset(); + + /** + * Load last partial chunk checksum from checksum file. + * Need to be called with FsDataset lock acquired. + * @param blockFile + * @param metaFile + * @return the last partial checksum + * @throws IOException + */ + byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 216b9347da3..a485110a481 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1171,30 +1171,6 @@ public ReplicaHandler append(ExtendedBlock b, } } - - private byte[] loadLastPartialChunkChecksum( - File blockFile, File metaFile) throws IOException { - DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); - final int checksumSize = dcs.getChecksumSize(); - final long onDiskLen = blockFile.length(); - final int bytesPerChecksum = dcs.getBytesPerChecksum(); - - if (onDiskLen % bytesPerChecksum == 0) { - // the last chunk is a complete one. No need to preserve its checksum - // because it will not be modified. - return null; - } - - int offsetInChecksum = BlockMetadataHeader.getHeaderSize() + - (int)(onDiskLen / bytesPerChecksum * checksumSize); - byte[] lastChecksum = new byte[checksumSize]; - try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) { - raf.seek(offsetInChecksum); - raf.read(lastChecksum, 0, checksumSize); - } - return lastChecksum; - } - /** Append to a finalized replica * Change a finalized replica to be a RBW replica and * bump its generation stamp to be the newGS @@ -1233,7 +1209,7 @@ private ReplicaBeingWritten append(String bpid, v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); // load last checksum and datalen - byte[] lastChunkChecksum = loadLastPartialChunkChecksum( + byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum( replicaInfo.getBlockFile(), replicaInfo.getMetaFile()); newReplicaInfo.setLastChecksumAndDataLen( replicaInfo.getNumBytes(), lastChunkChecksum); @@ -1621,7 +1597,7 @@ public ReplicaInPipeline convertTemporaryToRbw( // load last checksum and datalen final File destMeta = FsDatasetUtil.getMetaFile(dest, b.getGenerationStamp()); - byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta); + byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum(dest, destMeta); rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum); // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 67e7192b188..ee60d7ee831 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -23,6 +23,7 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.OutputStreamWriter; +import java.io.RandomAccessFile; import java.nio.channels.ClosedChannelException; import java.nio.file.Files; import java.nio.file.Paths; @@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -58,6 +60,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.CloseableReferenceCount; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -1040,5 +1043,41 @@ public StorageType getStorageType() { DatanodeStorage toDatanodeStorage() { return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType); } + + + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + // readHeader closes the temporary FileInputStream. + DataChecksum dcs = BlockMetadataHeader + .readHeader(metaFile).getChecksum(); + final int checksumSize = dcs.getChecksumSize(); + final long onDiskLen = blockFile.length(); + final int bytesPerChecksum = dcs.getBytesPerChecksum(); + + if (onDiskLen % bytesPerChecksum == 0) { + // the last chunk is a complete one. No need to preserve its checksum + // because it will not be modified. + return null; + } + + long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + + (onDiskLen / bytesPerChecksum) * checksumSize; + byte[] lastChecksum = new byte[checksumSize]; + try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) { + raf.seek(offsetInChecksum); + int readBytes = raf.read(lastChecksum, 0, checksumSize); + if (readBytes == -1) { + throw new IOException("Expected to read " + checksumSize + + " bytes from offset " + offsetInChecksum + + " but reached end of file."); + } else if (readBytes != checksumSize) { + throw new IOException("Expected to read " + checksumSize + + " bytes from offset " + offsetInChecksum + " but read " + + readBytes + " bytes."); + } + } + return lastChecksum; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 70a3fea4b85..5a54a754409 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -523,6 +523,12 @@ public BlockIterator loadBlockIterator(String bpid, String name) public FsDatasetSpi getDataset() { throw new UnsupportedOperationException(); } + + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + return null; + } } private final Map> blockMap diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java index 021361b2d8a..4c57479d869 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -35,8 +35,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; import com.google.common.base.Supplier; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica; @@ -869,4 +873,100 @@ public Boolean get() { } info.sem.release(1); } + + /** + * Test concurrent append and scan. + * @throws Exception + */ + @Test(timeout=120000) + public void testAppendWhileScanning() throws Exception { + GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); + Configuration conf = new Configuration(); + // throttle the block scanner: 1MB per second + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576); + // Set a really long scan period. + conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L); + final int numExpectedFiles = 1; + final int numExpectedBlocks = 1; + final int numNameServices = 1; + // the initial file length can not be too small. + // Otherwise checksum file stream buffer will be pre-filled and + // BlockSender will not see the updated checksum. + final int initialFileLength = 2*1024*1024+100; + final TestContext ctx = new TestContext(conf, numNameServices); + // create one file, with one block. + ctx.createFiles(0, numExpectedFiles, initialFileLength); + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + String storageID = ctx.volumes.get(0).getStorageID(); + synchronized (info) { + info.sem = new Semaphore(numExpectedBlocks*2); + info.shouldRun = true; + info.notify(); + } + // VolumeScanner scans the first block when DN starts. + // Due to throttler, this should take approximately 2 seconds. + waitForRescan(info, numExpectedBlocks); + + // update throttler to schedule rescan immediately. + // this number must be larger than initial file length, otherwise + // throttler prevents immediate rescan. + conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, + initialFileLength+32*1024); + BlockScanner.Conf newConf = new BlockScanner.Conf(conf); + ctx.datanode.getBlockScanner().setConf(newConf); + // schedule the first block for scanning + ExtendedBlock first = ctx.getFileBlock(0, 0); + ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first); + + // append the file before VolumeScanner completes scanning the block, + // which takes approximately 2 seconds to complete. + FileSystem fs = ctx.cluster.getFileSystem(); + FSDataOutputStream os = fs.append(ctx.getPath(0)); + long seed = -1; + int size = 200; + final byte[] bytes = AppendTestUtil.randomBytes(seed, size); + os.write(bytes); + os.hflush(); + os.close(); + fs.close(); + + // verify that volume scanner does not find bad blocks after append. + waitForRescan(info, numExpectedBlocks); + + GenericTestUtils.setLogLevel(DataNode.LOG, Level.INFO); + } + + private void waitForRescan(final TestScanResultHandler.Info info, + final int numExpectedBlocks) + throws TimeoutException, InterruptedException { + LOG.info("Waiting for the first 1 blocks to be scanned."); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + synchronized (info) { + if (info.blocksScanned >= numExpectedBlocks) { + LOG.info("info = {}. blockScanned has now reached 1.", info); + return true; + } else { + LOG.info("info = {}. Waiting for blockScanned to reach 1.", info); + return false; + } + } + } + }, 1000, 30000); + + synchronized (info) { + assertEquals("Expected 1 good block.", + numExpectedBlocks, info.goodBlocks.size()); + info.goodBlocks.clear(); + assertEquals("Expected 1 blocksScanned", + numExpectedBlocks, info.blocksScanned); + assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size()); + info.blocksScanned = 0; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 82bbdc4b7ef..35bd7e8bd37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -878,6 +878,12 @@ public BlockIterator loadBlockIterator(String bpid, String name) public FsDatasetSpi getDataset() { throw new UnsupportedOperationException(); } + + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + return null; + } } private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 8af54db4cf6..071b1a61042 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -91,6 +91,12 @@ public BlockIterator newBlockIterator(String bpid, String name) { return null; } + @Override + public byte[] loadLastPartialChunkChecksum( + File blockFile, File metaFile) throws IOException { + return null; + } + @Override public BlockIterator loadBlockIterator(String bpid, String name) throws IOException {