diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java index bd2d8c9d697..df026bb66ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.util.Time; @@ -53,9 +54,14 @@ public class ConnectionContext { private long lastActiveTs = 0; /** The connection's active status would expire after this window. */ private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30); + /** The maximum number of requests that this connection can handle concurrently. **/ + private final int maxConcurrencyPerConn; - public ConnectionContext(ProxyAndInfo connection) { + public ConnectionContext(ProxyAndInfo connection, Configuration conf) { this.client = connection; + this.maxConcurrencyPerConn = conf.getInt( + RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY, + RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT); } /** @@ -93,6 +99,23 @@ public class ConnectionContext { * @return True if the connection can be used. */ public synchronized boolean isUsable() { + return hasAvailableConcurrency() && !isClosed(); + } + + /** + * Return true if this connection context still has available concurrency, + * else return false. + */ + private synchronized boolean hasAvailableConcurrency() { + return this.numThreads < maxConcurrencyPerConn; + } + + /** + * Check if the connection is idle. It checks if the connection is not used + * by another thread. + * @return True if the connection is not used by another thread. + */ + public synchronized boolean isIdle() { return !isActive() && !isClosed(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index 293a4b64d20..a2aa7c869e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -77,7 +77,6 @@ public class ConnectionPool { private static final Logger LOG = LoggerFactory.getLogger(ConnectionPool.class); - /** Configuration settings for the connection pool. */ private final Configuration conf; @@ -94,6 +93,8 @@ public class ConnectionPool { private volatile List connections = new ArrayList<>(); /** Connection index for round-robin. */ private final AtomicInteger clientIndex = new AtomicInteger(0); + /** Underlying socket index. **/ + private final AtomicInteger socketIndex = new AtomicInteger(0); /** Min number of connections per user. */ private final int minSize; @@ -105,6 +106,9 @@ public class ConnectionPool { /** The last time a connection was active. */ private volatile long lastActiveTime = 0; + /** Enable using multiple physical socket or not. **/ + private final boolean enableMultiSocket; + /** Map for the protocols and their protobuf implementations. */ private final static Map, ProtoImpl> PROTO_MAP = new HashMap<>(); static { @@ -149,9 +153,12 @@ public class ConnectionPool { this.minSize = minPoolSize; this.maxSize = maxPoolSize; this.minActiveRatio = minActiveRatio; + this.enableMultiSocket = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY, + RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT); // Add minimum connections to the pool - for (int i=0; i tmpConnections = this.connections; - int size = tmpConnections.size(); - // Inc and mask off sign bit, lookup index should be non-negative int - int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF; - for (int i=0; i 0) { + // Get a connection from the pool following round-robin + // Inc and mask off sign bit, lookup index should be non-negative int + int threadIndex = this.clientIndex.getAndIncrement() & 0x7FFFFFFF; + conn = tmpConnections.get(threadIndex % size); + } return conn; } @@ -256,10 +262,9 @@ public class ConnectionPool { int targetCount = Math.min(num, this.connections.size() - this.minSize); // Remove and close targetCount of connections List tmpConnections = new ArrayList<>(); - for (int i = 0; i < this.connections.size(); i++) { - ConnectionContext conn = this.connections.get(i); + for (ConnectionContext conn : this.connections) { // Only pick idle connections to close - if (removed.size() < targetCount && conn.isUsable()) { + if (removed.size() < targetCount && conn.isIdle()) { removed.add(conn); } else { tmpConnections.add(conn); @@ -267,8 +272,8 @@ public class ConnectionPool { } this.connections = tmpConnections; } - LOG.debug("Expected to remove {} connection " + - "and actually removed {} connections", num, removed.size()); + LOG.debug("Expected to remove {} connection and actually removed {} connections", + num, removed.size()); return removed; } @@ -303,7 +308,6 @@ public class ConnectionPool { */ protected int getNumActiveConnections() { int ret = 0; - List tmpConnections = this.connections; for (ConnectionContext conn : tmpConnections) { if (conn.isActive()) { @@ -320,10 +324,9 @@ public class ConnectionPool { */ protected int getNumIdleConnections() { int ret = 0; - List tmpConnections = this.connections; for (ConnectionContext conn : tmpConnections) { - if (conn.isUsable()) { + if (conn.isIdle()) { ret++; } } @@ -393,8 +396,9 @@ public class ConnectionPool { * @throws IOException If it cannot get a new connection. */ public ConnectionContext newConnection() throws IOException { - return newConnection( - this.conf, this.namenodeAddress, this.ugi, this.protocol); + return newConnection(this.conf, this.namenodeAddress, + this.ugi, this.protocol, this.enableMultiSocket, + this.socketIndex.incrementAndGet()); } /** @@ -402,19 +406,20 @@ public class ConnectionPool { * context for a single user/security context. To maximize throughput it is * recommended to use multiple connection per user+server, allowing multiple * writes and reads to be dispatched in parallel. - * @param + * @param Input type T. * * @param conf Configuration for the connection. * @param nnAddress Address of server supporting the ClientProtocol. * @param ugi User context. * @param proto Interface of the protocol. + * @param enableMultiSocket Enable multiple socket or not. * @return proto for the target ClientProtocol that contains the user's * security context. * @throws IOException If it cannot be created. */ protected static ConnectionContext newConnection(Configuration conf, - String nnAddress, UserGroupInformation ugi, Class proto) - throws IOException { + String nnAddress, UserGroupInformation ugi, Class proto, + boolean enableMultiSocket, int socketIndex) throws IOException { if (!PROTO_MAP.containsKey(proto)) { String msg = "Unsupported protocol for connection to NameNode: " + ((proto != null) ? proto.getName() : "null"); @@ -437,15 +442,23 @@ public class ConnectionPool { } InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); final long version = RPC.getProtocolVersion(classes.protoPb); - Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi, - conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + Object proxy; + if (enableMultiSocket) { + FederationConnectionId connectionId = new FederationConnectionId( + socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf), + defaultPolicy, conf, socketIndex); + proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId, + conf, factory).getProxy(); + } else { + proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi, + conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + } + T client = newProtoClient(proto, classes, proxy); Text dtService = SecurityUtil.buildTokenService(socket); - ProxyAndInfo clientProxy = - new ProxyAndInfo(client, dtService, socket); - ConnectionContext connection = new ConnectionContext(clientProxy); - return connection; + ProxyAndInfo clientProxy = new ProxyAndInfo(client, dtService, socket); + return new ConnectionContext(clientProxy, conf); } private static T newProtoClient(Class proto, ProtoImpl classes, @@ -453,7 +466,7 @@ public class ConnectionPool { try { Constructor constructor = classes.protoClientPb.getConstructor(classes.protoPb); - Object o = constructor.newInstance(new Object[] {proxy}); + Object o = constructor.newInstance(proxy); if (proto.isAssignableFrom(o.getClass())) { @SuppressWarnings("unchecked") T client = (T) o; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java new file mode 100644 index 00000000000..0be1f8b1be2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationConnectionId.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.security.UserGroupInformation; + +import java.net.InetSocketAddress; + +public class FederationConnectionId extends Client.ConnectionId { + private final int index; + + public FederationConnectionId(InetSocketAddress address, Class protocol, + UserGroupInformation ticket, int rpcTimeout, + RetryPolicy connectionRetryPolicy, Configuration conf, int index) { + super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf); + this.index = index; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(super.hashCode()) + .append(this.index) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!super.equals(obj)) { + return false; + } + if (obj instanceof FederationConnectionId) { + FederationConnectionId other = (FederationConnectionId)obj; + return new EqualsBuilder() + .append(this.index, other.index) + .isEquals(); + } + return false; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 3b6df418089..266e3c144f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -135,6 +135,12 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_ROUTER_PREFIX + "connection.clean.ms"; public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(10); + public static final String DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY = + FEDERATION_ROUTER_PREFIX + "enable.multiple.socket"; + public static final boolean DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT = false; + public static final String DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY = + FEDERATION_ROUTER_PREFIX + "max.concurrency.per.connection"; + public static final int DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT = 1; // HDFS Router RPC client public static final String DFS_ROUTER_CLIENT_THREADS_SIZE = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 51d9b8aabdd..a261ddc583c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -134,6 +134,33 @@ + + dfs.federation.router.enable.multiple.socket + false + + If enable multiple downstream socket or not. If true, ConnectionPool + will use a new socket when creating a new connection for the same user, + and RouterRPCClient will get a better throughput. It's best used with + dfs.federation.router.max.concurrency.per.connection together to get + a better throughput with fewer sockets. Such as enable + dfs.federation.router.enable.multiple.socket and + set dfs.federation.router.max.concurrency.per.connection = 20. + + + + + dfs.federation.router.max.concurrency.per.connection + 1 + + The maximum number of requests that a connection can handle concurrently. + When the number of requests being processed by a socket is less than this value, + new request will be processed by this socket. When enable + dfs.federation.router.enable.multiple.socket, it's best + set this value greater than 1, such as 20, to avoid frequent + creation and idle sockets in the case of a NS with jitter requests. + + + dfs.federation.router.connection.pool.clean.ms 60000 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index ff2cea57612..4c6b151bcff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -416,11 +416,13 @@ The RPC server to receive connections from the clients. The Router forwards the client requests to the NameNodes. It uses a pool of connections to reduce the latency of creating them. -| Property | Default | Description| +| Property | Default | Description | |:---- |:---- |:---- | | dfs.federation.router.connection.pool-size | 1 | Size of the pool of connections from the router to namenodes. | | dfs.federation.router.connection.clean.ms | 10000 | Time interval, in milliseconds, to check if the connection pool should remove unused connections. | | dfs.federation.router.connection.pool.clean.ms | 60000 | Time interval, in milliseconds, to check if the connection manager should remove unused connection pools. | +| dfs.federation.router.enable.multiple.socket | false | If true, ConnectionPool will use a new socket when creating a new connection for the same user. And it's best used with dfs.federation.router.max.concurrency.per.connection together. | +| dfs.federation.router.max.concurrency.per.connection | 1 | The maximum number of requests that a connection can handle concurrently. | ### Admin server diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index acb79cb4701..04c2540c2aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -36,6 +36,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assert.assertNotNull; @@ -131,6 +132,44 @@ public class TestConnectionManager { checkPoolConnections(TEST_USER3, 4, 2); } + @Test + public void testGetConnectionWithConcurrency() throws Exception { + Map poolMap = connManager.getPools(); + Configuration copyConf = new Configuration(conf); + copyConf.setInt(RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY, 20); + + ConnectionPool pool = new ConnectionPool( + copyConf, TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, + ClientProtocol.class); + poolMap.put( + new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), + pool); + assertEquals(1, pool.getNumConnections()); + // one connection can process the maximum number of requests concurrently. + for (int i = 0; i < 20; i++) { + ConnectionContext cc = pool.getConnection(); + assertTrue(cc.isUsable()); + cc.getClient(); + } + assertEquals(1, pool.getNumConnections()); + + // Ask for more and this returns an unusable connection + ConnectionContext cc1 = pool.getConnection(); + assertTrue(cc1.isActive()); + assertFalse(cc1.isUsable()); + + // add a new connection into pool + pool.addConnection(pool.newConnection()); + // will return the new connection + ConnectionContext cc2 = pool.getConnection(); + assertTrue(cc2.isUsable()); + cc2.getClient(); + + assertEquals(2, pool.getNumConnections()); + + checkPoolConnections(TEST_USER1, 2, 2); + } + @Test public void testConnectionCreatorWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. @@ -317,6 +356,6 @@ public class TestConnectionManager { "Unsupported protocol for connection to NameNode: " + TestConnectionManager.class.getName(), () -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1, - TestConnectionManager.class)); + TestConnectionManager.class, false, 0)); } }