HDFS-11036. Ozone: reuse Xceiver connection. Contributed by Chen Liang.
This commit is contained in:
parent
ad16978e65
commit
00684d62ca
|
@ -30,6 +30,11 @@ public final class ScmConfigKeys {
|
|||
"dfs.container.ipc";
|
||||
public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
|
||||
|
||||
public static final String SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY =
|
||||
"scm.container.client.idle.threshold";
|
||||
public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
|
||||
10000;
|
||||
|
||||
// TODO : this is copied from OzoneConsts, may need to move to a better place
|
||||
public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
|
||||
}
|
||||
|
|
|
@ -19,27 +19,39 @@
|
|||
package org.apache.hadoop.scm;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
|
||||
import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
|
||||
|
||||
/**
|
||||
* XceiverClientManager is responsible for the lifecycle of XceiverClient
|
||||
* instances. Callers use this class to acquire an XceiverClient instance
|
||||
* connected to the desired container pipeline. When done, the caller also uses
|
||||
* this class to release the previously acquired XceiverClient instance.
|
||||
*
|
||||
* This class may evolve to implement efficient lifecycle management policies by
|
||||
* caching container location information and pooling connected client instances
|
||||
* for reuse without needing to reestablish a socket connection. The current
|
||||
* implementation simply allocates and closes a new instance every time.
|
||||
*
|
||||
* This class caches connection to container for reuse purpose, such that
|
||||
* accessing same container frequently will be through the same connection
|
||||
* without reestablishing connection. But the connection will be closed if
|
||||
* not being used for a period of time.
|
||||
*/
|
||||
public class XceiverClientManager {
|
||||
|
||||
//TODO : change this to SCM configuration class
|
||||
private final Configuration conf;
|
||||
private Cache<String, XceiverClientWithAccessInfo> openClient;
|
||||
private final long staleThresholdMs;
|
||||
|
||||
/**
|
||||
* Creates a new XceiverClientManager.
|
||||
|
@ -48,13 +60,38 @@ public class XceiverClientManager {
|
|||
*/
|
||||
public XceiverClientManager(Configuration conf) {
|
||||
Preconditions.checkNotNull(conf);
|
||||
this.staleThresholdMs = conf.getTimeDuration(
|
||||
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
|
||||
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
this.conf = conf;
|
||||
this.openClient = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS)
|
||||
.removalListener(
|
||||
new RemovalListener<String, XceiverClientWithAccessInfo>() {
|
||||
@Override
|
||||
public void onRemoval(
|
||||
RemovalNotification<String, XceiverClientWithAccessInfo>
|
||||
removalNotification) {
|
||||
// If the reference count is not 0, this xceiver client should not
|
||||
// be evicted, add it back to the cache.
|
||||
XceiverClientWithAccessInfo info = removalNotification.getValue();
|
||||
if (info.hasRefence()) {
|
||||
synchronized (XceiverClientManager.this.openClient) {
|
||||
XceiverClientManager.this
|
||||
.openClient.put(removalNotification.getKey(), info);
|
||||
}
|
||||
}
|
||||
}
|
||||
}).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a XceiverClient connected to a container capable of storing the
|
||||
* specified key.
|
||||
*
|
||||
* If there is already a cached XceiverClient, simply return the cached
|
||||
* otherwise create a new one.
|
||||
*
|
||||
* @param pipeline the container pipeline for the client connection
|
||||
* @return XceiverClient connected to a container
|
||||
* @throws IOException if an XceiverClient cannot be acquired
|
||||
|
@ -63,13 +100,28 @@ public class XceiverClientManager {
|
|||
Preconditions.checkNotNull(pipeline);
|
||||
Preconditions.checkArgument(pipeline.getMachines() != null);
|
||||
Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
|
||||
XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
|
||||
try {
|
||||
xceiverClient.connect();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Exception connecting XceiverClient.", e);
|
||||
String containerName = pipeline.getContainerName();
|
||||
XceiverClientWithAccessInfo info = openClient.getIfPresent(containerName);
|
||||
|
||||
if (info != null) {
|
||||
// we do have this connection, add reference and return
|
||||
info.incrementReference();
|
||||
return info.getXceiverClient();
|
||||
} else {
|
||||
// connection not found, create new, add reference and return
|
||||
XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
|
||||
try {
|
||||
xceiverClient.connect();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Exception connecting XceiverClient.", e);
|
||||
}
|
||||
info = new XceiverClientWithAccessInfo(xceiverClient);
|
||||
info.incrementReference();
|
||||
synchronized (openClient) {
|
||||
openClient.put(containerName, info);
|
||||
}
|
||||
return xceiverClient;
|
||||
}
|
||||
return xceiverClient;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -79,6 +131,44 @@ public class XceiverClientManager {
|
|||
*/
|
||||
public void releaseClient(XceiverClient xceiverClient) {
|
||||
Preconditions.checkNotNull(xceiverClient);
|
||||
xceiverClient.close();
|
||||
String containerName = xceiverClient.getPipeline().getContainerName();
|
||||
XceiverClientWithAccessInfo info;
|
||||
synchronized (openClient) {
|
||||
info = openClient.getIfPresent(containerName);
|
||||
}
|
||||
Preconditions.checkNotNull(info);
|
||||
info.decrementReference();
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper class for caching and cleaning XceiverClient. Three parameters:
|
||||
* - the actual XceiverClient object
|
||||
* - a time stamp representing the most recent access (acquire or release)
|
||||
* - a reference count, +1 when acquire, -1 when release
|
||||
*/
|
||||
private static class XceiverClientWithAccessInfo {
|
||||
final private XceiverClient xceiverClient;
|
||||
final private AtomicInteger referenceCount;
|
||||
|
||||
XceiverClientWithAccessInfo(XceiverClient xceiverClient) {
|
||||
this.xceiverClient = xceiverClient;
|
||||
this.referenceCount = new AtomicInteger(0);
|
||||
}
|
||||
|
||||
void incrementReference() {
|
||||
this.referenceCount.incrementAndGet();
|
||||
}
|
||||
|
||||
void decrementReference() {
|
||||
this.referenceCount.decrementAndGet();
|
||||
}
|
||||
|
||||
boolean hasRefence() {
|
||||
return this.referenceCount.get() != 0;
|
||||
}
|
||||
|
||||
XceiverClient getXceiverClient() {
|
||||
return xceiverClient;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue