diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 44414ea33e1..1e7d99484c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -30,6 +30,11 @@ public final class ScmConfigKeys { "dfs.container.ipc"; public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011; + public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY = + "scm.container.client.idle.threshold"; + public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT = + 10000; + // TODO : this is copied from OzoneConsts, may need to move to a better place public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index b9d77658f38..de706cb4b6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -19,27 +19,39 @@ package org.apache.hadoop.scm; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY; + /** * XceiverClientManager is responsible for the lifecycle of XceiverClient * instances. Callers use this class to acquire an XceiverClient instance * connected to the desired container pipeline. When done, the caller also uses * this class to release the previously acquired XceiverClient instance. * - * This class may evolve to implement efficient lifecycle management policies by - * caching container location information and pooling connected client instances - * for reuse without needing to reestablish a socket connection. The current - * implementation simply allocates and closes a new instance every time. + * + * This class caches connection to container for reuse purpose, such that + * accessing same container frequently will be through the same connection + * without reestablishing connection. But the connection will be closed if + * not being used for a period of time. */ public class XceiverClientManager { //TODO : change this to SCM configuration class private final Configuration conf; + private Cache openClient; + private final long staleThresholdMs; /** * Creates a new XceiverClientManager. @@ -48,13 +60,38 @@ public class XceiverClientManager { */ public XceiverClientManager(Configuration conf) { Preconditions.checkNotNull(conf); + this.staleThresholdMs = conf.getTimeDuration( + SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY, + SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS); this.conf = conf; + this.openClient = CacheBuilder.newBuilder() + .expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS) + .removalListener( + new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification + removalNotification) { + // If the reference count is not 0, this xceiver client should not + // be evicted, add it back to the cache. + XceiverClientWithAccessInfo info = removalNotification.getValue(); + if (info.hasRefence()) { + synchronized (XceiverClientManager.this.openClient) { + XceiverClientManager.this + .openClient.put(removalNotification.getKey(), info); + } + } + } + }).build(); } /** * Acquires a XceiverClient connected to a container capable of storing the * specified key. * + * If there is already a cached XceiverClient, simply return the cached + * otherwise create a new one. + * * @param pipeline the container pipeline for the client connection * @return XceiverClient connected to a container * @throws IOException if an XceiverClient cannot be acquired @@ -63,13 +100,28 @@ public class XceiverClientManager { Preconditions.checkNotNull(pipeline); Preconditions.checkArgument(pipeline.getMachines() != null); Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); - XceiverClient xceiverClient = new XceiverClient(pipeline, conf); - try { - xceiverClient.connect(); - } catch (Exception e) { - throw new IOException("Exception connecting XceiverClient.", e); + String containerName = pipeline.getContainerName(); + XceiverClientWithAccessInfo info = openClient.getIfPresent(containerName); + + if (info != null) { + // we do have this connection, add reference and return + info.incrementReference(); + return info.getXceiverClient(); + } else { + // connection not found, create new, add reference and return + XceiverClient xceiverClient = new XceiverClient(pipeline, conf); + try { + xceiverClient.connect(); + } catch (Exception e) { + throw new IOException("Exception connecting XceiverClient.", e); + } + info = new XceiverClientWithAccessInfo(xceiverClient); + info.incrementReference(); + synchronized (openClient) { + openClient.put(containerName, info); + } + return xceiverClient; } - return xceiverClient; } /** @@ -79,6 +131,44 @@ public class XceiverClientManager { */ public void releaseClient(XceiverClient xceiverClient) { Preconditions.checkNotNull(xceiverClient); - xceiverClient.close(); + String containerName = xceiverClient.getPipeline().getContainerName(); + XceiverClientWithAccessInfo info; + synchronized (openClient) { + info = openClient.getIfPresent(containerName); + } + Preconditions.checkNotNull(info); + info.decrementReference(); + } + + /** + * A helper class for caching and cleaning XceiverClient. Three parameters: + * - the actual XceiverClient object + * - a time stamp representing the most recent access (acquire or release) + * - a reference count, +1 when acquire, -1 when release + */ + private static class XceiverClientWithAccessInfo { + final private XceiverClient xceiverClient; + final private AtomicInteger referenceCount; + + XceiverClientWithAccessInfo(XceiverClient xceiverClient) { + this.xceiverClient = xceiverClient; + this.referenceCount = new AtomicInteger(0); + } + + void incrementReference() { + this.referenceCount.incrementAndGet(); + } + + void decrementReference() { + this.referenceCount.decrementAndGet(); + } + + boolean hasRefence() { + return this.referenceCount.get() != 0; + } + + XceiverClient getXceiverClient() { + return xceiverClient; + } } }