From 68290606eaca5578af281f8d418958abf89f1bb6 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 6 Jul 2018 14:59:49 -0700 Subject: [PATCH] HDFS-13121. NPE when request file descriptors when SC read. Contributed by Zsolt Venczel. (cherry picked from commit 0247cb6318507afe06816e337a19f396afc53efa) (cherry picked from commit f5f4d0b7e72eaf2ea9c9e791f71d0ea95f5b4b20) --- .../hdfs/client/impl/BlockReaderFactory.java | 5 ++ .../shortcircuit/TestShortCircuitCache.java | 89 +++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index a6e153da24e..1626d479676 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -594,6 +594,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { sock.recvFileInputStreams(fis, buf, 0, buf.length); ShortCircuitReplica replica = null; try { + if (fis[0] == null || fis[1] == null) { + throw new IOException("the datanode " + datanode + " failed to " + + "pass a file descriptor (might have reached open file limit)."); + } + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 5da6a250555..62eb7530b67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -42,6 +42,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.PeerCache; import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.DFSInputStream; @@ -50,10 +54,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; @@ -66,9 +72,11 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; @@ -819,4 +827,85 @@ public class TestShortCircuitCache { .fetch(Mockito.eq(extendedBlockId), Mockito.any()); } } + + @Test + public void testRequestFileDescriptorsWhenULimit() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testRequestFileDescriptorsWhenULimit", sockDir); + + final short replicas = 1; + final int fileSize = 3; + final String testFile = "/testfile"; + + try (MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(replicas).build()) { + + cluster.waitActive(); + + DistributedFileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, new Path(testFile), fileSize, replicas, 0L); + + LocatedBlock blk = new DFSClient(DFSUtilClient.getNNAddress(conf), conf) + .getLocatedBlocks(testFile, 0, fileSize).get(0); + + ClientContext clientContext = Mockito.mock(ClientContext.class); + Mockito.when(clientContext.getPeerCache()).thenAnswer( + (Answer) peerCacheCall -> { + PeerCache peerCache = new PeerCache(10, Long.MAX_VALUE); + DomainPeer peer = Mockito.spy(getDomainPeerToDn(conf)); + peerCache.put(blk.getLocations()[0], peer); + + Mockito.when(peer.getDomainSocket()).thenAnswer( + (Answer) domainSocketCall -> { + DomainSocket domainSocket = Mockito.mock(DomainSocket.class); + Mockito.when(domainSocket + .recvFileInputStreams( + Mockito.any(FileInputStream[].class), + Mockito.any(byte[].class), + Mockito.anyInt(), + Mockito.anyInt()) + ).thenAnswer( + // we are mocking the FileOutputStream array with nulls + (Answer) recvFileInputStreamsCall -> null + ); + return domainSocket; + } + ); + + return peerCache; + }); + + Mockito.when(clientContext.getShortCircuitCache()).thenAnswer( + (Answer) shortCircuitCacheCall -> { + ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); + Mockito.when(cache.allocShmSlot( + Mockito.any(DatanodeInfo.class), + Mockito.any(DomainPeer.class), + Mockito.any(MutableBoolean.class), + Mockito.any(ExtendedBlockId.class), + Mockito.anyString())) + .thenAnswer((Answer) call -> null); + + return cache; + } + ); + + DatanodeInfo[] nodes = blk.getLocations(); + + try { + Assert.assertNull(new BlockReaderFactory(new DfsClientConf(conf)) + .setInetSocketAddress(NetUtils.createSocketAddr(nodes[0] + .getXferAddr())) + .setClientCacheContext(clientContext) + .setDatanodeInfo(blk.getLocations()[0]) + .setBlock(blk.getBlock()) + .setBlockToken(new Token()) + .createShortCircuitReplicaInfo()); + } catch (NullPointerException ex) { + Assert.fail("Should not throw NPE when the native library is unable " + + "to create new files!"); + } + } + } }