HDFS-11701. NPE from Unresolved Host causes permanent DFSInputStream failures. Contributed by Lokesh Jain.
This commit is contained in:
parent
456705a07c
commit
b061215ecf
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
|
|||
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED_DEFAULT;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -238,7 +239,7 @@ public class ClientContext {
|
|||
return byteArrayManager;
|
||||
}
|
||||
|
||||
public int getNetworkDistance(DatanodeInfo datanodeInfo) {
|
||||
public int getNetworkDistance(DatanodeInfo datanodeInfo) throws IOException {
|
||||
// If applications disable the feature or the client machine can't
|
||||
// resolve its network location, clientNode will be set to null.
|
||||
if (clientNode == null) {
|
||||
|
|
|
@ -550,7 +550,11 @@ public class DFSUtilClient {
|
|||
private static final Map<String, Boolean> localAddrMap = Collections
|
||||
.synchronizedMap(new HashMap<String, Boolean>());
|
||||
|
||||
public static boolean isLocalAddress(InetSocketAddress targetAddr) {
|
||||
public static boolean isLocalAddress(InetSocketAddress targetAddr)
|
||||
throws IOException {
|
||||
if (targetAddr.isUnresolved()) {
|
||||
throw new IOException("Unresolved host: " + targetAddr);
|
||||
}
|
||||
InetAddress addr = targetAddr.getAddress();
|
||||
Boolean cached = localAddrMap.get(addr.getHostAddress());
|
||||
if (cached != null) {
|
||||
|
|
|
@ -357,28 +357,32 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
return reader;
|
||||
}
|
||||
final ShortCircuitConf scConf = conf.getShortCircuitConf();
|
||||
if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
|
||||
if (clientContext.getUseLegacyBlockReaderLocal()) {
|
||||
reader = getLegacyBlockReaderLocal();
|
||||
if (reader != null) {
|
||||
LOG.trace("{}: returning new legacy block reader local.", this);
|
||||
return reader;
|
||||
try {
|
||||
if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
|
||||
if (clientContext.getUseLegacyBlockReaderLocal()) {
|
||||
reader = getLegacyBlockReaderLocal();
|
||||
if (reader != null) {
|
||||
LOG.trace("{}: returning new legacy block reader local.", this);
|
||||
return reader;
|
||||
}
|
||||
} else {
|
||||
reader = getBlockReaderLocal();
|
||||
if (reader != null) {
|
||||
LOG.trace("{}: returning new block reader local.", this);
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
reader = getBlockReaderLocal();
|
||||
}
|
||||
if (scConf.isDomainSocketDataTraffic()) {
|
||||
reader = getRemoteBlockReaderFromDomain();
|
||||
if (reader != null) {
|
||||
LOG.trace("{}: returning new block reader local.", this);
|
||||
LOG.trace("{}: returning new remote block reader using UNIX domain "
|
||||
+ "socket on {}", this, pathInfo.getPath());
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (scConf.isDomainSocketDataTraffic()) {
|
||||
reader = getRemoteBlockReaderFromDomain();
|
||||
if (reader != null) {
|
||||
LOG.trace("{}: returning new remote block reader using UNIX domain "
|
||||
+ "socket on {}", this, pathInfo.getPath());
|
||||
return reader;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Block read failed. Getting remote block reader using TCP", e);
|
||||
}
|
||||
Preconditions.checkState(!DFSInputStream.tcpReadsDisabledForTesting,
|
||||
"TCP reads were disabled for testing, but we failed to " +
|
||||
|
@ -469,7 +473,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
return null;
|
||||
}
|
||||
|
||||
private BlockReader getBlockReaderLocal() throws InvalidToken {
|
||||
private BlockReader getBlockReaderLocal() throws IOException {
|
||||
LOG.trace("{}: trying to construct a BlockReaderLocal for short-circuit "
|
||||
+ " reads.", this);
|
||||
if (pathInfo == null) {
|
||||
|
|
|
@ -133,7 +133,8 @@ public class DomainSocketFactory {
|
|||
*
|
||||
* @return Information about the socket path.
|
||||
*/
|
||||
public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf) {
|
||||
public PathInfo getPathInfo(InetSocketAddress addr, ShortCircuitConf conf)
|
||||
throws IOException {
|
||||
// If there is no domain socket path configured, we can't use domain
|
||||
// sockets.
|
||||
if (conf.getDomainSocketPath().isEmpty()) return PathInfo.NOT_CONFIGURED;
|
||||
|
|
|
@ -28,6 +28,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -68,6 +70,7 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -79,6 +82,9 @@ public class TestBlockReaderFactory {
|
|||
@Rule
|
||||
public final Timeout globalTimeout = new Timeout(180000);
|
||||
|
||||
@Rule
|
||||
public ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
DomainSocket.disableBindPathValidation();
|
||||
|
@ -144,6 +150,33 @@ public class TestBlockReaderFactory {
|
|||
sockDir.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the case where address passed to DomainSocketFactory#getPathInfo is
|
||||
* unresolved. In such a case an exception should be thrown.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testGetPathInfoWithUnresolvedHost() throws Exception {
|
||||
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
||||
|
||||
Configuration conf =
|
||||
createShortCircuitConf("testGetPathInfoWithUnresolvedHost", sockDir);
|
||||
conf.set(DFS_CLIENT_CONTEXT,
|
||||
"testGetPathInfoWithUnresolvedHost_Context");
|
||||
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
|
||||
|
||||
DfsClientConf.ShortCircuitConf shortCircuitConf =
|
||||
new DfsClientConf.ShortCircuitConf(conf);
|
||||
DomainSocketFactory domainSocketFactory =
|
||||
new DomainSocketFactory(shortCircuitConf);
|
||||
InetSocketAddress targetAddr =
|
||||
InetSocketAddress.createUnresolved("random", 32456);
|
||||
|
||||
thrown.expect(IOException.class);
|
||||
thrown.expectMessage("Unresolved host: " + targetAddr);
|
||||
domainSocketFactory.getPathInfo(targetAddr, shortCircuitConf);
|
||||
sockDir.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the case where we have multiple threads waiting on the
|
||||
* ShortCircuitCache delivering a certain ShortCircuitReplica.
|
||||
|
|
Loading…
Reference in New Issue