HADOOP-17222. Create socket address leveraging URI cache (#2817)

Contributed by fanrui.

Signed-off-by: Mingliang Liu <liuml07@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
Stephen O'Donnell 2021-03-30 11:59:44 +01:00 committed by GitHub
parent c96fbb9199
commit 56ef16468a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 168 additions and 33 deletions

View File

@ -39,6 +39,7 @@ import java.net.ConnectException;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@ -46,6 +47,8 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.thirdparty.com.google.common.cache.Cache;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.commons.net.util.SubnetUtils;
import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
import org.apache.hadoop.classification.InterfaceAudience;
@ -178,11 +181,33 @@ public class NetUtils {
* include a port number
* @param configName the name of the configuration from which
* <code>target</code> was loaded. This is used in the
* exception message in the case that parsing fails.
* exception message in the case that parsing fails.
*/
public static InetSocketAddress createSocketAddr(String target,
int defaultPort,
String configName) {
return createSocketAddr(target, defaultPort, configName, false);
}
/**
* Create an InetSocketAddress from the given target string and
* default port. If the string cannot be parsed correctly, the
* <code>configName</code> parameter is used as part of the
* exception message, allowing the user to better diagnose
* the misconfiguration.
*
* @param target a string of either "host" or "host:port"
* @param defaultPort the default port if <code>target</code> does not
* include a port number
* @param configName the name of the configuration from which
* <code>target</code> was loaded. This is used in the
* exception message in the case that parsing fails.
* @param useCacheIfPresent Whether use cache when create URI
*/
public static InetSocketAddress createSocketAddr(String target,
int defaultPort,
String configName,
boolean useCacheIfPresent) {
String helpText = "";
if (configName != null) {
helpText = " (configuration property '" + configName + "')";
@ -192,15 +217,8 @@ public class NetUtils {
helpText);
}
target = target.trim();
boolean hasScheme = target.contains("://");
URI uri = null;
try {
uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
}
boolean hasScheme = target.contains("://");
URI uri = createURI(target, hasScheme, helpText, useCacheIfPresent);
String host = uri.getHost();
int port = uri.getPort();
@ -208,10 +226,9 @@ public class NetUtils {
port = defaultPort;
}
String path = uri.getPath();
if ((host == null) || (port < 0) ||
(!hasScheme && path != null && !path.isEmpty()))
{
(!hasScheme && path != null && !path.isEmpty())) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
@ -219,6 +236,40 @@ public class NetUtils {
return createSocketAddrForHost(host, port);
}
private static final long URI_CACHE_SIZE_DEFAULT = 1000;
private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12;
private static final Cache<String, URI> URI_CACHE = CacheBuilder.newBuilder()
.maximumSize(URI_CACHE_SIZE_DEFAULT)
.expireAfterWrite(URI_CACHE_EXPIRE_TIME_DEFAULT, TimeUnit.HOURS)
.build();
private static URI createURI(String target,
boolean hasScheme,
String helpText,
boolean useCacheIfPresent) {
URI uri;
if (useCacheIfPresent) {
uri = URI_CACHE.getIfPresent(target);
if (uri != null) {
return uri;
}
}
try {
uri = hasScheme ? URI.create(target) :
URI.create("dummyscheme://" + target);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Does not contain a valid host:port authority: " + target + helpText
);
}
if (useCacheIfPresent) {
URI_CACHE.put(target, uri);
}
return uri;
}
/**
* Create a socket address with the given host and port. The hostname
* might be replaced with another host that was set via

View File

@ -352,7 +352,7 @@ public class TestNetUtils {
assertEquals(1000, addr.getPort());
try {
addr = NetUtils.createSocketAddr(
NetUtils.createSocketAddr(
"127.0.0.1:blahblah", 1000, "myconfig");
fail("Should have failed to parse bad port");
} catch (IllegalArgumentException iae) {
@ -360,6 +360,49 @@ public class TestNetUtils {
}
}
@Test
public void testCreateSocketAddressWithURICache() throws Throwable {
InetSocketAddress addr = NetUtils.createSocketAddr(
"127.0.0.1:12345", 1000, "myconfig", true);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(12345, addr.getPort());
addr = NetUtils.createSocketAddr(
"127.0.0.1:12345", 1000, "myconfig", true);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(12345, addr.getPort());
// ----------------------------------------------------
addr = NetUtils.createSocketAddr(
"127.0.0.1", 1000, "myconfig", true);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(1000, addr.getPort());
addr = NetUtils.createSocketAddr(
"127.0.0.1", 1000, "myconfig", true);
assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
assertEquals(1000, addr.getPort());
// ----------------------------------------------------
try {
NetUtils.createSocketAddr(
"127.0.0.1:blahblah", 1000, "myconfig", true);
fail("Should have failed to parse bad port");
} catch (IllegalArgumentException iae) {
assertInException(iae, "myconfig");
}
try {
NetUtils.createSocketAddr(
"127.0.0.1:blahblah", 1000, "myconfig", true);
fail("Should have failed to parse bad port");
} catch (IllegalArgumentException iae) {
assertInException(iae, "myconfig");
}
}
private void assertRemoteDetailsIncluded(IOException wrapped)
throws Throwable {
assertInException(wrapped, "desthost");

View File

@ -370,6 +370,16 @@ public class TestSecurityUtil {
verifyServiceAddr(staticHost, "255.255.255.255");
}
@Test
public void testSocketAddrWithChangeIP() {
String staticHost = "host4";
NetUtils.addStaticResolution(staticHost, "255.255.255.255");
verifyServiceAddr(staticHost, "255.255.255.255");
NetUtils.addStaticResolution(staticHost, "255.255.255.254");
verifyServiceAddr(staticHost, "255.255.255.254");
}
// this is a bizarre case, but it's if a test tries to remap an ip address
@Test
public void testSocketAddrWithIPToStaticIP() {

View File

@ -1089,7 +1089,9 @@ public class DFSInputStream extends FSInputStream
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
boolean uriCacheEnabled = dfsClient.getConf().isUriCacheEnabled();
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr,
-1, null, uriCacheEnabled);
return new DNAddrPair(chosenNode, targetAddr, storageType, block);
}

View File

@ -410,6 +410,9 @@ public interface HdfsClientConfigKeys {
String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size";
String URI_CACHE_KEY = PREFIX + "uri.cache.enabled";
boolean URI_CACHE_DEFAULT = false;
interface ShortCircuit {
String PREFIX = Read.PREFIX + "shortcircuit.";

View File

@ -129,6 +129,7 @@ public class DfsClientConf {
private final int blockWriteLocateFollowingMaxDelayMs;
private final long defaultBlockSize;
private final long prefetchSize;
private final boolean uriCacheEnabled;
private final short defaultReplication;
private final String taskId;
private final FsPermission uMask;
@ -211,24 +212,7 @@ public class DfsClientConf {
Write.MAX_PACKETS_IN_FLIGHT_KEY,
Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
final boolean byteArrayManagerEnabled = conf.getBoolean(
Write.ByteArrayManager.ENABLED_KEY,
Write.ByteArrayManager.ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
writeByteArrayManagerConf = null;
} else {
final int countThreshold = conf.getInt(
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
Write.ByteArrayManager.COUNT_LIMIT_KEY,
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
writeByteArrayManagerConf = new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}
writeByteArrayManagerConf = loadWriteByteArrayManagerConf(conf);
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
DFS_BLOCK_SIZE_DEFAULT);
@ -240,6 +224,10 @@ public class DfsClientConf {
Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
uriCacheEnabled = conf.getBoolean(Read.URI_CACHE_KEY,
Read.URI_CACHE_DEFAULT);
numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
numBlockWriteRetry = conf.getInt(
@ -308,6 +296,27 @@ public class DfsClientConf {
"can't be more then 5.");
}
private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
Configuration conf) {
final boolean byteArrayManagerEnabled = conf.getBoolean(
Write.ByteArrayManager.ENABLED_KEY,
Write.ByteArrayManager.ENABLED_DEFAULT);
if (!byteArrayManagerEnabled) {
return null;
}
final int countThreshold = conf.getInt(
Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
final int countLimit = conf.getInt(
Write.ByteArrayManager.COUNT_LIMIT_KEY,
Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
final long countResetTimePeriodMs = conf.getLong(
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
return new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs);
}
@SuppressWarnings("unchecked")
private List<Class<? extends ReplicaAccessorBuilder>>
loadReplicaAccessorBuilderClasses(Configuration conf) {
@ -555,6 +564,13 @@ public class DfsClientConf {
return prefetchSize;
}
/**
* @return the uriCacheEnable
*/
public boolean isUriCacheEnabled() {
return uriCacheEnabled;
}
/**
* @return the defaultReplication
*/

View File

@ -4173,6 +4173,15 @@
</description>
</property>
<property>
<name>dfs.client.read.uri.cache.enabled</name>
<value>false</value>
<description>
If true, dfs client will use cache when creating URI based on host:port
to reduce the frequency of URI object creation.
</description>
</property>
<property>
<name>dfs.client.read.short.circuit.replica.stale.threshold.ms</name>
<value>1800000</value>

View File

@ -44,6 +44,7 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
HdfsClientConfigKeys.Failover.class,
HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
HdfsClientConfigKeys.BlockWrite.class,
HdfsClientConfigKeys.Read.class,
HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };
// Set error modes