diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 73838efecb3..ad4f90a0bd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -31,6 +31,9 @@ Release 2.7.0 - UNRELEASED HDFS-7254. Add documentation for hot swaping DataNode drives (Lei Xu via Colin P. McCabe) + HDFS-6877. Avoid calling checkDisk when an HDFS volume is removed during a + write. (Lei Xu via Colin P. McCabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 4d1cc6c256c..3d497f55113 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; @@ -1229,7 +1230,28 @@ class BlockReceiver implements Closeable { if (lastPacketInBlock) { // Finalize the block and close the block file - finalizeBlock(startTime); + try { + finalizeBlock(startTime); + } catch (ReplicaNotFoundException e) { + // Verify that the exception is due to volume removal. + FsVolumeSpi volume; + synchronized (datanode.data) { + volume = datanode.data.getVolume(block); + } + if (volume == null) { + // ReplicaInfo has been removed due to the corresponding data + // volume has been removed. Don't need to check disk error. + LOG.info(myString + + ": BlockReceiver is interrupted because the block pool " + + block.getBlockPoolId() + " has been removed.", e); + sendAckUpstream(ack, expected, totalAckTimeNanos, 0, + Status.OOB_INTERRUPTED); + running = false; + receiverThread.interrupt(); + continue; + } + throw e; + } } sendAckUpstream(ack, expected, totalAckTimeNanos, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 3f1400dfaa0..52432aef1bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -267,6 +267,9 @@ public interface FsDatasetSpi extends FSDatasetMBean { * The block size is what is in the parameter b and it must match the amount * of data written * @throws IOException + * @throws ReplicaNotFoundException if the replica can not be found when the + * block is been finalized. For instance, the block resides on an HDFS volume + * that has been removed. */ public void finalizeBlock(ExtendedBlock b) throws IOException; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index fb774b78faa..50cc00d91ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -219,7 +219,7 @@ enum Status { CHECKSUM_OK = 6; ERROR_UNSUPPORTED = 7; OOB_RESTART = 8; // Quick restart - OOB_RESERVED1 = 9; // Reserved + OOB_INTERRUPTED = 9; // Interrupted OOB_RESERVED2 = 10; // Reserved OOB_RESERVED3 = 11; // Reserved IN_PROGRESS = 12; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index f6e984b7ed0..27cfc82e62b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,6 +51,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -156,9 +158,12 @@ public class TestDataNodeHotSwapVolumes { throws IOException, TimeoutException, InterruptedException { int attempts = 50; // Wait 5 seconds. while (attempts > 0) { - if (getNumReplicas(fs, file, blockIdx) == numReplicas) { + int actualReplicas = getNumReplicas(fs, file, blockIdx); + if (actualReplicas == numReplicas) { return; } + System.out.printf("Block %d of file %s has %d replicas (desired %d).\n", + blockIdx, file.toString(), actualReplicas, numReplicas); Thread.sleep(100); attempts--; } @@ -167,9 +172,16 @@ public class TestDataNodeHotSwapVolumes { } /** Parses data dirs from DataNode's configuration. */ - private static Collection getDataDirs(DataNode datanode) { - return datanode.getConf().getTrimmedStringCollection( - DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); + private static List getDataDirs(DataNode datanode) { + return new ArrayList(datanode.getConf().getTrimmedStringCollection( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); + } + + /** Force the DataNode to report missing blocks immediately. */ + private static void triggerDeleteReport(DataNode datanode) + throws IOException { + datanode.scheduleAllBlockReport(0); + DataNodeTestUtils.triggerDeletionReport(datanode); } @Test @@ -274,7 +286,7 @@ public class TestDataNodeHotSwapVolumes { /** * Test adding one volume on a running MiniDFSCluster with only one NameNode. */ - @Test + @Test(timeout=60000) public void testAddOneNewVolume() throws IOException, ReconfigurationException, InterruptedException, TimeoutException { @@ -304,7 +316,7 @@ public class TestDataNodeHotSwapVolumes { verifyFileLength(cluster.getFileSystem(), testFile, numBlocks); } - @Test(timeout = 60000) + @Test(timeout=60000) public void testAddVolumesDuringWrite() throws IOException, InterruptedException, TimeoutException, ReconfigurationException { @@ -336,7 +348,7 @@ public class TestDataNodeHotSwapVolumes { assertEquals(expectedNumBlocks, actualNumBlocks); } - @Test + @Test(timeout=60000) public void testAddVolumesToFederationNN() throws IOException, TimeoutException, InterruptedException, ReconfigurationException { @@ -371,7 +383,7 @@ public class TestDataNodeHotSwapVolumes { Collections.frequency(actualNumBlocks.get(0), 0)); } - @Test + @Test(timeout=60000) public void testRemoveOneVolume() throws ReconfigurationException, InterruptedException, TimeoutException, IOException { @@ -410,12 +422,13 @@ public class TestDataNodeHotSwapVolumes { assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks()); } - @Test + @Test(timeout=60000) public void testReplicatingAfterRemoveVolume() throws InterruptedException, TimeoutException, IOException, ReconfigurationException { startDFSCluster(1, 2); - final DistributedFileSystem fs = cluster.getFileSystem(); + + final FileSystem fs = cluster.getFileSystem(); final short replFactor = 2; Path testFile = new Path("/test"); createFile(testFile, 4, replFactor); @@ -428,14 +441,9 @@ public class TestDataNodeHotSwapVolumes { assertFileLocksReleased( new ArrayList(oldDirs).subList(1, oldDirs.size())); - // Force DataNode to report missing blocks. - dn.scheduleAllBlockReport(0); - DataNodeTestUtils.triggerDeletionReport(dn); + triggerDeleteReport(dn); - // The 2nd block only has 1 replica due to the removed data volume. waitReplication(fs, testFile, 1, 1); - - // Wait NameNode to replica missing blocks. DFSTestUtil.waitReplication(fs, testFile, replFactor); } @@ -478,4 +486,55 @@ public class TestDataNodeHotSwapVolumes { } } } + + @Test(timeout=180000) + public void testRemoveVolumeBeingWritten() + throws InterruptedException, TimeoutException, ReconfigurationException, + IOException { + // test against removing volumes on the different DataNode on the pipeline. + for (int i = 0; i < 3; i++) { + testRemoveVolumeBeingWrittenForDatanode(i); + } + } + + /** + * Test the case that remove a data volume on a particular DataNode when the + * volume is actively being written. + * @param dataNodeIdx the index of the DataNode to remove a volume. + */ + private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx) + throws IOException, ReconfigurationException, TimeoutException, + InterruptedException { + // Starts DFS cluster with 3 DataNodes to form a pipeline. + startDFSCluster(1, 3); + + final short REPLICATION = 3; + final DataNode dn = cluster.getDataNodes().get(dataNodeIdx); + final FileSystem fs = cluster.getFileSystem(); + final Path testFile = new Path("/test"); + + FSDataOutputStream out = fs.create(testFile, REPLICATION); + + Random rb = new Random(0); + byte[] writeBuf = new byte[BLOCK_SIZE / 2]; // half of the block. + rb.nextBytes(writeBuf); + out.write(writeBuf); + out.hflush(); + + List oldDirs = getDataDirs(dn); + String newDirs = oldDirs.get(1); // Remove the first volume. + dn.reconfigurePropertyImpl( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); + + rb.nextBytes(writeBuf); + out.write(writeBuf); + out.hflush(); + out.close(); + + // Verify the file has sufficient replications. + DFSTestUtil.waitReplication(fs, testFile, REPLICATION); + // Read the content back + byte[] content = DFSTestUtil.readFileBuffer(fs, testFile); + assertEquals(BLOCK_SIZE, content.length); + } }