diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index aeeef97d09c..de9c7e44773 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -305,6 +305,7 @@ class BlockSender implements java.io.Closeable { LengthInputStream metaIn = null; boolean keepMetaInOpen = false; try { + DataNodeFaultInjector.get().throwTooManyOpenFiles(); metaIn = datanode.data.getMetaDataInputStream(block); if (!corruptChecksumOk || metaIn != null) { if (metaIn == null) { @@ -334,10 +335,14 @@ class BlockSender implements java.io.Closeable { LOG.warn("Could not find metadata file for " + block); } } catch (FileNotFoundException e) { - // The replica is on its volume map but not on disk - datanode.notifyNamenodeDeletedBlock(block, replica.getStorageUuid()); - datanode.data.invalidate(block.getBlockPoolId(), - new Block[]{block.getLocalBlock()}); + if ((e.getMessage() != null) && !(e.getMessage() + .contains("Too many open files"))) { + // The replica is on its volume map but not on disk + datanode + .notifyNamenodeDeletedBlock(block, replica.getStorageUuid()); + datanode.data.invalidate(block.getBlockPoolId(), + new Block[] {block.getLocalBlock()}); + } throw e; } finally { if (!keepMetaInOpen) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 931c1241f56..c271124b294 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; +import java.io.FileNotFoundException; import java.io.IOException; /** @@ -58,4 +59,7 @@ public class DataNodeFaultInjector { public void failPipeline(ReplicaInPipelineInterface replicaInfo, String mirrorAddr) throws IOException { } + + public void throwTooManyOpenFiles() throws FileNotFoundException { + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 355f7a1e753..c631d10f8db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -24,8 +24,10 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.*; import java.io.Closeable; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.List; @@ -44,8 +46,10 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -331,4 +335,62 @@ public class TestDataNodeMetrics { } } } + + @Test + public void testDNShouldNotDeleteBlockONTooManyOpenFiles() + throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setLong(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + final DataNodeFaultInjector injector = + Mockito.mock(DataNodeFaultInjector.class); + try { + // wait until the cluster is up + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path p = new Path("/testShouldThrowTMP"); + DFSTestUtil.writeFile(fs, p, new String("testdata")); + //Before DN throws too many open files + verifyBlockLocations(fs, p, 1); + Mockito.doThrow(new FileNotFoundException("Too many open files")). + when(injector). + throwTooManyOpenFiles(); + DataNodeFaultInjector.set(injector); + ExtendedBlock b = + fs.getClient().getLocatedBlocks(p.toString(), 0).get(0).getBlock(); + try { + new BlockSender(b, 0, -1, false, true, true, + cluster.getDataNodes().get(0), null, + CachingStrategy.newDefaultStrategy()); + fail("Must throw FileNotFoundException"); + } catch (FileNotFoundException fe) { + assertTrue("Should throw too many open files", + fe.getMessage().contains("Too many open files")); + } + cluster.triggerHeartbeats(); // IBR delete ack + //After DN throws too many open files + assertTrue(cluster.getDataNodes().get(0).getFSDataset().isValidBlock(b)); + verifyBlockLocations(fs, p, 1); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + DataNodeFaultInjector.set(oldInjector); + } + } + + private void verifyBlockLocations(DistributedFileSystem fs, Path p, + final int expected) + throws IOException, TimeoutException, InterruptedException { + final LocatedBlock lb = + fs.getClient().getLocatedBlocks(p.toString(), 0).get(0); + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return lb.getLocations().length == expected; + } + }, 1000, 6000); + } }