diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index 1003b957c64..ce4318531a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -598,6 +598,11 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, sock.recvFileInputStreams(fis, buf, 0, buf.length); ShortCircuitReplica replica = null; try { + if (fis[0] == null || fis[1] == null) { + throw new IOException("the datanode " + datanode + " failed to " + + "pass a file descriptor (might have reached open file limit)."); + } + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 4e2cedef560..ac29c3c33f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -42,6 +42,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.PeerCache; import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.DFSInputStream; @@ -50,10 +54,12 @@ import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; @@ -66,9 +72,11 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; @@ -819,4 +827,85 @@ public void testFetchOrCreateRetries() throws Exception { .fetch(Mockito.eq(extendedBlockId), Mockito.any()); } } + + @Test + public void testRequestFileDescriptorsWhenULimit() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testRequestFileDescriptorsWhenULimit", sockDir); + + final short replicas = 1; + final int fileSize = 3; + final String testFile = "/testfile"; + + try (MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(replicas).build()) { + + cluster.waitActive(); + + DistributedFileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, new Path(testFile), fileSize, replicas, 0L); + + LocatedBlock blk = new DFSClient(DFSUtilClient.getNNAddress(conf), conf) + .getLocatedBlocks(testFile, 0, fileSize).get(0); + + ClientContext clientContext = Mockito.mock(ClientContext.class); + Mockito.when(clientContext.getPeerCache()).thenAnswer( + (Answer) peerCacheCall -> { + PeerCache peerCache = new PeerCache(10, Long.MAX_VALUE); + DomainPeer peer = Mockito.spy(getDomainPeerToDn(conf)); + peerCache.put(blk.getLocations()[0], peer); + + Mockito.when(peer.getDomainSocket()).thenAnswer( + (Answer) domainSocketCall -> { + DomainSocket domainSocket = Mockito.mock(DomainSocket.class); + Mockito.when(domainSocket + .recvFileInputStreams( + Mockito.any(FileInputStream[].class), + Mockito.any(byte[].class), + Mockito.anyInt(), + Mockito.anyInt()) + ).thenAnswer( + // we are mocking the FileOutputStream array with nulls + (Answer) recvFileInputStreamsCall -> null + ); + return domainSocket; + } + ); + + return peerCache; + }); + + Mockito.when(clientContext.getShortCircuitCache()).thenAnswer( + (Answer) shortCircuitCacheCall -> { + ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); + Mockito.when(cache.allocShmSlot( + Mockito.any(DatanodeInfo.class), + Mockito.any(DomainPeer.class), + Mockito.any(MutableBoolean.class), + Mockito.any(ExtendedBlockId.class), + Mockito.anyString())) + .thenAnswer((Answer) call -> null); + + return cache; + } + ); + + DatanodeInfo[] nodes = blk.getLocations(); + + try { + Assert.assertNull(new BlockReaderFactory(new DfsClientConf(conf)) + .setInetSocketAddress(NetUtils.createSocketAddr(nodes[0] + .getXferAddr())) + .setClientCacheContext(clientContext) + .setDatanodeInfo(blk.getLocations()[0]) + .setBlock(blk.getBlock()) + .setBlockToken(new Token()) + .createShortCircuitReplicaInfo()); + } catch (NullPointerException ex) { + Assert.fail("Should not throw NPE when the native library is unable " + + "to create new files!"); + } + } + } }