HDFS-11887. Shared XceiverClient should be closed if there is no open clients to avoid resource leak. Contributed by Mukul Kumar Singh.

This commit is contained in:
Weiwei Yang 2017-06-06 21:20:20 +08:00
parent b8f4713861
commit 245c6fed97
10 changed files with 324 additions and 97 deletions

View File

@ -35,6 +35,11 @@ public final class ScmConfigKeys {
public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT = public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
10000; 10000;
public static final String SCM_CONTAINER_CLIENT_MAX_SIZE_KEY =
"scm.container.client.max.size";
public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT =
256;
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= "dfs.container.ratis.enabled"; = "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

View File

@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* A Client for the storageContainer protocol. * A Client for the storageContainer protocol.
*/ */
public class XceiverClient implements XceiverClientSpi { public class XceiverClient extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
private final Pipeline pipeline; private final Pipeline pipeline;
private final Configuration config; private final Configuration config;
@ -55,6 +55,7 @@ public class XceiverClient implements XceiverClientSpi {
* @param config -- Ozone Config * @param config -- Ozone Config
*/ */
public XceiverClient(Pipeline pipeline, Configuration config) { public XceiverClient(Pipeline pipeline, Configuration config) {
super();
Preconditions.checkNotNull(pipeline); Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config); Preconditions.checkNotNull(config);
this.pipeline = pipeline; this.pipeline = pipeline;

View File

@ -20,8 +20,9 @@ package org.apache.hadoop.scm;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.Callable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
@ -31,8 +32,14 @@ import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT; import static org.apache.hadoop.scm.ScmConfigKeys
import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY; .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT;
/** /**
* XceiverClientManager is responsible for the lifecycle of XceiverClient * XceiverClientManager is responsible for the lifecycle of XceiverClient
@ -50,8 +57,7 @@ public class XceiverClientManager {
//TODO : change this to SCM configuration class //TODO : change this to SCM configuration class
private final Configuration conf; private final Configuration conf;
private Cache<String, XceiverClientWithAccessInfo> openClient; private final Cache<String, XceiverClientSpi> clientCache;
private final long staleThresholdMs;
private final boolean useRatis; private final boolean useRatis;
/** /**
@ -61,121 +67,91 @@ public class XceiverClientManager {
*/ */
public XceiverClientManager(Configuration conf) { public XceiverClientManager(Configuration conf) {
Preconditions.checkNotNull(conf); Preconditions.checkNotNull(conf);
this.staleThresholdMs = conf.getTimeDuration( int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT);
long staleThresholdMs = conf.getTimeDuration(
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY, SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS); SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
this.useRatis = conf.getBoolean( this.useRatis = conf.getBoolean(
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.conf = conf; this.conf = conf;
this.openClient = CacheBuilder.newBuilder() this.clientCache = CacheBuilder.newBuilder()
.expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS) .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
.maximumSize(maxSize)
.removalListener( .removalListener(
new RemovalListener<String, XceiverClientWithAccessInfo>() { new RemovalListener<String, XceiverClientSpi>() {
@Override @Override
public void onRemoval( public void onRemoval(
RemovalNotification<String, XceiverClientWithAccessInfo> RemovalNotification<String, XceiverClientSpi>
removalNotification) { removalNotification) {
// If the reference count is not 0, this xceiver client should not synchronized (clientCache) {
// be evicted, add it back to the cache. // Mark the entry as evicted
XceiverClientWithAccessInfo info = removalNotification.getValue(); XceiverClientSpi info = removalNotification.getValue();
if (info.hasRefence()) { info.setEvicted();
synchronized (XceiverClientManager.this.openClient) {
XceiverClientManager.this
.openClient.put(removalNotification.getKey(), info);
}
} }
} }
}).build(); }).build();
} }
@VisibleForTesting
public Cache<String, XceiverClientSpi> getClientCache() {
return clientCache;
}
/** /**
* Acquires a XceiverClient connected to a container capable of storing the * Acquires a SharedXceiverClient connected to a container capable of
* specified key. * storing the specified key.
* *
* If there is already a cached XceiverClient, simply return the cached * If there is already a cached SharedXceiverClient, simply return
* otherwise create a new one. * the cached otherwise create a new one.
* *
* @param pipeline the container pipeline for the client connection * @param pipeline the container pipeline for the client connection
* @return XceiverClient connected to a container * @return SharedXceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired * @throws IOException if an SharedXceiverClient cannot be acquired
*/ */
public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException { public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException {
Preconditions.checkNotNull(pipeline); Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null); Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
String containerName = pipeline.getContainerName();
synchronized(openClient) {
XceiverClientWithAccessInfo info =
openClient.getIfPresent(containerName);
if (info != null) { synchronized (clientCache) {
// we do have this connection, add reference and return XceiverClientSpi info = getClient(pipeline);
info.incrementReference(); info.incrementReference();
return info.getXceiverClient(); return info;
} else {
// connection not found, create new, add reference and return
final XceiverClientSpi xceiverClient = useRatis ?
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
: new XceiverClient(pipeline, conf);
try {
xceiverClient.connect();
} catch (Exception e) {
throw new IOException("Exception connecting XceiverClient.", e);
}
info = new XceiverClientWithAccessInfo(xceiverClient);
info.incrementReference();
openClient.put(containerName, info);
return xceiverClient;
}
} }
} }
/** /**
* Releases an XceiverClient after use. * Releases an SharedXceiverClient after use.
* *
* @param xceiverClient client to release * @param client client to release
*/ */
public void releaseClient(XceiverClientSpi xceiverClient) { public void releaseClient(XceiverClientSpi client) {
Preconditions.checkNotNull(xceiverClient); Preconditions.checkNotNull(client);
String containerName = xceiverClient.getPipeline().getContainerName(); synchronized (clientCache) {
XceiverClientWithAccessInfo info; client.decrementReference();
synchronized (openClient) {
info = openClient.getIfPresent(containerName);
Preconditions.checkNotNull(info);
info.decrementReference();
} }
} }
/** private XceiverClientSpi getClient(Pipeline pipeline)
* A helper class for caching and cleaning XceiverClient. Three parameters: throws IOException {
* - the actual XceiverClient object String containerName = pipeline.getContainerName();
* - a time stamp representing the most recent access (acquire or release) try {
* - a reference count, +1 when acquire, -1 when release return clientCache.get(containerName,
*/ new Callable<XceiverClientSpi>() {
private static class XceiverClientWithAccessInfo { @Override
final private XceiverClientSpi xceiverClient; public XceiverClientSpi call() throws Exception {
final private AtomicInteger referenceCount; XceiverClientSpi client = useRatis ?
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
XceiverClientWithAccessInfo(XceiverClientSpi xceiverClient) { : new XceiverClient(pipeline, conf);
this.xceiverClient = xceiverClient; client.connect();
this.referenceCount = new AtomicInteger(0); return client;
} }
});
void incrementReference() { } catch (Exception e) {
this.referenceCount.incrementAndGet(); throw new IOException("Exception getting XceiverClient.", e);
}
void decrementReference() {
this.referenceCount.decrementAndGet();
}
boolean hasRefence() {
return this.referenceCount.get() != 0;
}
XceiverClientSpi getXceiverClient() {
return xceiverClient;
} }
} }
} }

View File

@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
* An abstract implementation of {@link XceiverClientSpi} using Ratis. * An abstract implementation of {@link XceiverClientSpi} using Ratis.
* The underlying RPC mechanism can be chosen via the constructor. * The underlying RPC mechanism can be chosen via the constructor.
*/ */
public final class XceiverClientRatis implements XceiverClientSpi { public final class XceiverClientRatis extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
public static XceiverClientRatis newXceiverClientRatis( public static XceiverClientRatis newXceiverClientRatis(
@ -58,6 +58,7 @@ public final class XceiverClientRatis implements XceiverClientSpi {
/** Constructs a client. */ /** Constructs a client. */
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) { private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
super();
this.pipeline = pipeline; this.pipeline = pipeline;
this.rpcType = rpcType; this.rpcType = rpcType;
} }

View File

@ -18,24 +18,65 @@
package org.apache.hadoop.scm; package org.apache.hadoop.scm;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* A Client for the storageContainer protocol. * A Client for the storageContainer protocol.
*/ */
public interface XceiverClientSpi extends Closeable { public abstract class XceiverClientSpi implements Closeable {
final private AtomicInteger referenceCount;
private boolean isEvicted;
XceiverClientSpi() {
this.referenceCount = new AtomicInteger(0);
this.isEvicted = false;
}
void incrementReference() {
this.referenceCount.incrementAndGet();
}
void decrementReference() {
this.referenceCount.decrementAndGet();
cleanup();
}
void setEvicted() {
isEvicted = true;
cleanup();
}
// close the xceiverClient only if,
// 1) there is no refcount on the client
// 2) it has been evicted from the cache.
private void cleanup() {
if (referenceCount.get() == 0 && isEvicted) {
close();
}
}
@VisibleForTesting
public int getRefcount() {
return referenceCount.get();
}
/** /**
* Connects to the leader in the pipeline. * Connects to the leader in the pipeline.
*/ */
void connect() throws Exception; public abstract void connect() throws Exception;
@Override @Override
void close(); public abstract void close();
/** /**
* Returns the pipeline of machines that host the container used by this * Returns the pipeline of machines that host the container used by this
@ -43,7 +84,7 @@ public interface XceiverClientSpi extends Closeable {
* *
* @return pipeline of machines that host the container * @return pipeline of machines that host the container
*/ */
Pipeline getPipeline(); public abstract Pipeline getPipeline();
/** /**
* Sends a given command to server and gets the reply back. * Sends a given command to server and gets the reply back.
@ -51,6 +92,6 @@ public interface XceiverClientSpi extends Closeable {
* @return Response to the command * @return Response to the command
* @throws IOException * @throws IOException
*/ */
ContainerCommandResponseProto sendCommand( public abstract ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException; ContainerCommandRequestProto request) throws IOException;
} }

View File

@ -169,9 +169,10 @@ public class AsyncBlockWriter {
} else { } else {
Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
String containerName = pipeline.getContainerName(); String containerName = pipeline.getContainerName();
XceiverClientSpi client = null;
try { try {
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
XceiverClientSpi client = parentCache.getClientManager() client = parentCache.getClientManager()
.acquireClient(parentCache.getPipeline(block.getBlockID())); .acquireClient(parentCache.getPipeline(block.getBlockID()));
// BUG: fix the trace ID. // BUG: fix the trace ID.
ContainerProtocolCalls.writeSmallFile(client, containerName, ContainerProtocolCalls.writeSmallFile(client, containerName,
@ -192,6 +193,9 @@ public class AsyncBlockWriter {
block.getBlockID(), containerName, ex); block.getBlockID(), containerName, ex);
throw ex; throw ex;
} finally { } finally {
if (client != null) {
parentCache.getClientManager().releaseClient(client);
}
block.clearData(); block.clearData();
} }
} }

View File

@ -112,6 +112,7 @@ public class TestBufferManager {
// read the list from CBlockServer. So we mimic that action here. // read the list from CBlockServer. So we mimic that action here.
pipeline.setData(Longs.toByteArray(x)); pipeline.setData(Longs.toByteArray(x));
containerPipelines.add(pipeline); containerPipelines.add(pipeline);
xceiverClientManager.releaseClient(client);
} }
return containerPipelines; return containerPipelines;
} }

View File

@ -123,6 +123,7 @@ public class TestLocalBlockCache {
// read the list from CBlockServer. So we mimic that action here. // read the list from CBlockServer. So we mimic that action here.
pipeline.setData(Longs.toByteArray(x)); pipeline.setData(Longs.toByteArray(x));
containerPipelines.add(pipeline); containerPipelines.add(pipeline);
xceiverClientManager.releaseClient(client);
} }
return containerPipelines; return containerPipelines;
} }

View File

@ -93,6 +93,7 @@ public class TestContainerSmallFile {
traceID); traceID);
String readData = response.getData().getData().toStringUtf8(); String readData = response.getData().getData().toStringUtf8();
Assert.assertEquals("data123", readData); Assert.assertEquals("data123", readData);
xceiverClientManager.releaseClient(client);
} }
@Test @Test
@ -111,6 +112,7 @@ public class TestContainerSmallFile {
ContainerProtos.GetSmallFileResponseProto response = ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, containerName, "key", ContainerProtocolCalls.readSmallFile(client, containerName, "key",
traceID); traceID);
xceiverClientManager.releaseClient(client);
} }
@Test @Test
@ -133,6 +135,7 @@ public class TestContainerSmallFile {
ContainerProtos.GetSmallFileResponseProto response = ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, invalidName, "key", ContainerProtocolCalls.readSmallFile(client, invalidName, "key",
traceID); traceID);
xceiverClientManager.releaseClient(client);
} }
} }

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import static org.apache.hadoop.scm
.ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
/**
* Test for XceiverClientManager caching and eviction.
*/
public class TestXceiverClientManager {
private static OzoneConfiguration config;
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@Rule
public ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void init() throws IOException {
config = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(config)
.numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster
.createStorageContainerLocationClient();
}
@Test
public void testCaching() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
XceiverClientManager clientManager = new XceiverClientManager(conf);
String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(containerName1);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(client1.getRefcount(), 1);
Assert.assertEquals(containerName1,
client1.getPipeline().getContainerName());
String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(containerName2);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(client2.getRefcount(), 1);
Assert.assertEquals(containerName2,
client2.getPipeline().getContainerName());
XceiverClientSpi client3 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(client3.getRefcount(), 2);
Assert.assertEquals(client1.getRefcount(), 2);
Assert.assertEquals(containerName1,
client3.getPipeline().getContainerName());
Assert.assertEquals(client1, client3);
clientManager.releaseClient(client1);
clientManager.releaseClient(client2);
clientManager.releaseClient(client3);
}
@Test
public void testFreeByReference() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 2);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache();
String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(containerName1);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(client1.getRefcount(), 1);
Assert.assertEquals(containerName1,
client1.getPipeline().getContainerName());
String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(containerName2);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(client2.getRefcount(), 1);
Assert.assertEquals(containerName2,
client2.getPipeline().getContainerName());
Assert.assertNotEquals(client1, client2);
String containerName3 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline3 =
storageContainerLocationClient.allocateContainer(containerName3);
XceiverClientSpi client3 = clientManager.acquireClient(pipeline3);
Assert.assertEquals(client3.getRefcount(), 1);
Assert.assertEquals(containerName3,
client3.getPipeline().getContainerName());
// least recent container (i.e containerName1) is evicted
XceiverClientSpi nonExistent1 = cache.getIfPresent(containerName1);
Assert.assertEquals(nonExistent1, null);
// However container call should succeed because of refcount on the client.
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
ContainerProtocolCalls.createContainer(client1, traceID1);
// After releasing the client, this connection should be closed
// and any container operations should fail
clientManager.releaseClient(client1);
exception.expect(IOException.class);
exception.expectMessage("This channel is not connected.");
ContainerProtocolCalls.createContainer(client1, traceID1);
clientManager.releaseClient(client2);
clientManager.releaseClient(client3);
}
@Test
public void testFreeByEviction() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 2);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache();
String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(containerName1);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(client1.getRefcount(), 1);
Assert.assertEquals(containerName1,
client1.getPipeline().getContainerName());
String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(containerName2);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(client2.getRefcount(), 1);
Assert.assertEquals(containerName2,
client2.getPipeline().getContainerName());
Assert.assertNotEquals(client1, client2);
clientManager.releaseClient(client1);
Assert.assertEquals(client1.getRefcount(), 0);
String containerName3 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline3 =
storageContainerLocationClient.allocateContainer(containerName3);
XceiverClientSpi client3 = clientManager.acquireClient(pipeline3);
Assert.assertEquals(client3.getRefcount(), 1);
Assert.assertEquals(containerName3,
client3.getPipeline().getContainerName());
// now client 1 should be evicted
XceiverClientSpi nonExistent = cache.getIfPresent(containerName1);
Assert.assertEquals(nonExistent, null);
// Any container operation should now fail
String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
exception.expect(IOException.class);
exception.expectMessage("This channel is not connected.");
ContainerProtocolCalls.createContainer(client1, traceID2);
clientManager.releaseClient(client2);
clientManager.releaseClient(client3);
}
}