diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c6d32d009d0..d1c6bf746ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1072,6 +1072,9 @@ Release 2.7.0 - UNRELEASED HDFS-8039. Fix TestDebugAdmin#testRecoverLease and testVerifyBlockChecksumCommand on Windows. (Xiaoyu Yao via cnauroth) + HDFS-7996. After swapping a volume, BlockReceiver reports + ReplicaNotFoundException (Lei (Eddy) Xu via Colin P. McCabe) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 4e8ce94ab2e..58cb8b1a51b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -281,7 +281,7 @@ class BlockReceiver implements Closeable { } /** - * close files. + * close files and release volume reference. */ @Override public void close() throws IOException { @@ -798,17 +798,20 @@ class BlockReceiver implements Closeable { // then finalize block or convert temporary to RBW. // For client-writes, the block is finalized in the PacketResponder. if (isDatanode || isTransfer) { - // close the block/crc files - close(); - block.setNumBytes(replicaInfo.getNumBytes()); + // Hold a volume reference to finalize block. + try (ReplicaHandler handler = claimReplicaHandler()) { + // close the block/crc files + close(); + block.setNumBytes(replicaInfo.getNumBytes()); - if (stage == BlockConstructionStage.TRANSFER_RBW) { - // for TRANSFER_RBW, convert temporary to RBW - datanode.data.convertTemporaryToRbw(block); - } else { - // for isDatnode or TRANSFER_FINALIZED - // Finalize the block. - datanode.data.finalizeBlock(block); + if (stage == BlockConstructionStage.TRANSFER_RBW) { + // for TRANSFER_RBW, convert temporary to RBW + datanode.data.convertTemporaryToRbw(block); + } else { + // for isDatnode or TRANSFER_FINALIZED + // Finalize the block. + datanode.data.finalizeBlock(block); + } } datanode.metrics.incrBlocksWritten(); } @@ -980,7 +983,14 @@ class BlockReceiver implements Closeable { } return partialCrc; } - + + /** The caller claims the ownership of the replica handler. */ + private ReplicaHandler claimReplicaHandler() { + ReplicaHandler handler = replicaHandler; + replicaHandler = null; + return handler; + } + private static enum PacketResponderType { NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE } @@ -1280,12 +1290,15 @@ class BlockReceiver implements Closeable { * @param startTime time when BlockReceiver started receiving the block */ private void finalizeBlock(long startTime) throws IOException { - BlockReceiver.this.close(); - final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() - : 0; - block.setNumBytes(replicaInfo.getNumBytes()); - datanode.data.finalizeBlock(block); - + long endTime = 0; + // Hold a volume reference to finalize block. + try (ReplicaHandler handler = BlockReceiver.this.claimReplicaHandler()) { + BlockReceiver.this.close(); + endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; + block.setNumBytes(replicaInfo.getNumBytes()); + datanode.data.finalizeBlock(block); + } + if (pinning) { datanode.data.setPinning(block); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index f5772e3a02c..668084b84cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -64,6 +65,8 @@ import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.hamcrest.CoreMatchers.anyOf; @@ -77,6 +80,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.timeout; public class TestDataNodeHotSwapVolumes { @@ -577,6 +581,7 @@ public class TestDataNodeHotSwapVolumes { final DataNode dn = cluster.getDataNodes().get(dataNodeIdx); final FileSystem fs = cluster.getFileSystem(); final Path testFile = new Path("/test"); + final long lastTimeDiskErrorCheck = dn.getLastDiskErrorCheck(); FSDataOutputStream out = fs.create(testFile, REPLICATION); @@ -586,6 +591,23 @@ public class TestDataNodeHotSwapVolumes { out.write(writeBuf); out.hflush(); + // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the + // BlockReceiver releases volume reference before finalizeBlock(), the blocks + // on the volume will be removed, and finalizeBlock() throws IOE. + final FsDatasetSpi data = dn.data; + dn.data = Mockito.spy(data); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) + throws IOException, InterruptedException { + Thread.sleep(1000); + // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that + // the block is not removed, since the volume reference should not + // be released at this point. + data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0]); + return null; + } + }).when(dn.data).finalizeBlock(any(ExtendedBlock.class)); + final CyclicBarrier barrier = new CyclicBarrier(2); List oldDirs = getDataDirs(dn); @@ -612,13 +634,19 @@ public class TestDataNodeHotSwapVolumes { out.hflush(); out.close(); + reconfigThread.join(); + // Verify the file has sufficient replications. DFSTestUtil.waitReplication(fs, testFile, REPLICATION); // Read the content back byte[] content = DFSTestUtil.readFileBuffer(fs, testFile); assertEquals(BLOCK_SIZE, content.length); - reconfigThread.join(); + // If an IOException thrown from BlockReceiver#run, it triggers + // DataNode#checkDiskError(). So we can test whether checkDiskError() is called, + // to see whether there is IOException in BlockReceiver#run(). + assertEquals(lastTimeDiskErrorCheck, dn.getLastDiskErrorCheck()); + if (!exceptions.isEmpty()) { throw new IOException(exceptions.get(0).getCause()); }