From 245c6fed97a8dc7979a68f6de52aaf9cc812287e Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 6 Jun 2017 21:20:20 +0800 Subject: [PATCH] HDFS-11887. Shared XceiverClient should be closed if there is no open clients to avoid resource leak. Contributed by Mukul Kumar Singh. --- .../org/apache/hadoop/scm/ScmConfigKeys.java | 5 + .../org/apache/hadoop/scm/XceiverClient.java | 3 +- .../hadoop/scm/XceiverClientManager.java | 150 ++++++-------- .../apache/hadoop/scm/XceiverClientRatis.java | 3 +- .../apache/hadoop/scm/XceiverClientSpi.java | 55 ++++- .../cache/impl/AsyncBlockWriter.java | 6 +- .../hadoop/cblock/TestBufferManager.java | 1 + .../hadoop/cblock/TestLocalBlockCache.java | 1 + .../ozone/scm/TestContainerSmallFile.java | 3 + .../ozone/scm/TestXceiverClientManager.java | 194 ++++++++++++++++++ 10 files changed, 324 insertions(+), 97 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 9553171be8b..d34325483e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -35,6 +35,11 @@ public final class ScmConfigKeys { public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT = 10000; + public static final String SCM_CONTAINER_CLIENT_MAX_SIZE_KEY = + "scm.container.client.max.size"; + public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT = + 256; + public static final String DFS_CONTAINER_RATIS_ENABLED_KEY = "dfs.container.ratis.enabled"; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java index fc9092a6e89..eb61709a200 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java @@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit; /** * A Client for the storageContainer protocol. */ -public class XceiverClient implements XceiverClientSpi { +public class XceiverClient extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); private final Pipeline pipeline; private final Configuration config; @@ -55,6 +55,7 @@ public class XceiverClient implements XceiverClientSpi { * @param config -- Ozone Config */ public XceiverClient(Pipeline pipeline, Configuration config) { + super(); Preconditions.checkNotNull(pipeline); Preconditions.checkNotNull(config); this.pipeline = pipeline; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java index 637268b14e9..a8373481195 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java @@ -20,8 +20,9 @@ package org.apache.hadoop.scm; import java.io.IOException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Callable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; @@ -31,8 +32,14 @@ import com.google.common.cache.RemovalNotification; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys + .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT; +import static org.apache.hadoop.scm.ScmConfigKeys + .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys + .SCM_CONTAINER_CLIENT_MAX_SIZE_KEY; +import static org.apache.hadoop.scm.ScmConfigKeys + .SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT; /** * XceiverClientManager is responsible for the lifecycle of XceiverClient @@ -50,8 +57,7 @@ public class XceiverClientManager { //TODO : change this to SCM configuration class private final Configuration conf; - private Cache openClient; - private final long staleThresholdMs; + private final Cache clientCache; private final boolean useRatis; /** @@ -61,121 +67,91 @@ public class XceiverClientManager { */ public XceiverClientManager(Configuration conf) { Preconditions.checkNotNull(conf); - this.staleThresholdMs = conf.getTimeDuration( + int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, + SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT); + long staleThresholdMs = conf.getTimeDuration( SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY, SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS); this.useRatis = conf.getBoolean( ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT); this.conf = conf; - this.openClient = CacheBuilder.newBuilder() - .expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS) + this.clientCache = CacheBuilder.newBuilder() + .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) + .maximumSize(maxSize) .removalListener( - new RemovalListener() { + new RemovalListener() { @Override public void onRemoval( - RemovalNotification + RemovalNotification removalNotification) { - // If the reference count is not 0, this xceiver client should not - // be evicted, add it back to the cache. - XceiverClientWithAccessInfo info = removalNotification.getValue(); - if (info.hasRefence()) { - synchronized (XceiverClientManager.this.openClient) { - XceiverClientManager.this - .openClient.put(removalNotification.getKey(), info); - } + synchronized (clientCache) { + // Mark the entry as evicted + XceiverClientSpi info = removalNotification.getValue(); + info.setEvicted(); } } }).build(); } + @VisibleForTesting + public Cache getClientCache() { + return clientCache; + } + /** - * Acquires a XceiverClient connected to a container capable of storing the - * specified key. + * Acquires a SharedXceiverClient connected to a container capable of + * storing the specified key. * - * If there is already a cached XceiverClient, simply return the cached - * otherwise create a new one. + * If there is already a cached SharedXceiverClient, simply return + * the cached otherwise create a new one. * * @param pipeline the container pipeline for the client connection - * @return XceiverClient connected to a container - * @throws IOException if an XceiverClient cannot be acquired + * @return SharedXceiverClient connected to a container + * @throws IOException if an SharedXceiverClient cannot be acquired */ - public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException { + public XceiverClientSpi acquireClient(Pipeline pipeline) + throws IOException { Preconditions.checkNotNull(pipeline); Preconditions.checkArgument(pipeline.getMachines() != null); Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); - String containerName = pipeline.getContainerName(); - synchronized(openClient) { - XceiverClientWithAccessInfo info = - openClient.getIfPresent(containerName); - if (info != null) { - // we do have this connection, add reference and return - info.incrementReference(); - return info.getXceiverClient(); - } else { - // connection not found, create new, add reference and return - final XceiverClientSpi xceiverClient = useRatis ? - XceiverClientRatis.newXceiverClientRatis(pipeline, conf) - : new XceiverClient(pipeline, conf); - try { - xceiverClient.connect(); - } catch (Exception e) { - throw new IOException("Exception connecting XceiverClient.", e); - } - info = new XceiverClientWithAccessInfo(xceiverClient); - info.incrementReference(); - openClient.put(containerName, info); - return xceiverClient; - } + synchronized (clientCache) { + XceiverClientSpi info = getClient(pipeline); + info.incrementReference(); + return info; } } /** - * Releases an XceiverClient after use. + * Releases an SharedXceiverClient after use. * - * @param xceiverClient client to release + * @param client client to release */ - public void releaseClient(XceiverClientSpi xceiverClient) { - Preconditions.checkNotNull(xceiverClient); - String containerName = xceiverClient.getPipeline().getContainerName(); - XceiverClientWithAccessInfo info; - synchronized (openClient) { - info = openClient.getIfPresent(containerName); - Preconditions.checkNotNull(info); - info.decrementReference(); + public void releaseClient(XceiverClientSpi client) { + Preconditions.checkNotNull(client); + synchronized (clientCache) { + client.decrementReference(); } } - /** - * A helper class for caching and cleaning XceiverClient. Three parameters: - * - the actual XceiverClient object - * - a time stamp representing the most recent access (acquire or release) - * - a reference count, +1 when acquire, -1 when release - */ - private static class XceiverClientWithAccessInfo { - final private XceiverClientSpi xceiverClient; - final private AtomicInteger referenceCount; - - XceiverClientWithAccessInfo(XceiverClientSpi xceiverClient) { - this.xceiverClient = xceiverClient; - this.referenceCount = new AtomicInteger(0); - } - - void incrementReference() { - this.referenceCount.incrementAndGet(); - } - - void decrementReference() { - this.referenceCount.decrementAndGet(); - } - - boolean hasRefence() { - return this.referenceCount.get() != 0; - } - - XceiverClientSpi getXceiverClient() { - return xceiverClient; + private XceiverClientSpi getClient(Pipeline pipeline) + throws IOException { + String containerName = pipeline.getContainerName(); + try { + return clientCache.get(containerName, + new Callable() { + @Override + public XceiverClientSpi call() throws Exception { + XceiverClientSpi client = useRatis ? + XceiverClientRatis.newXceiverClientRatis(pipeline, conf) + : new XceiverClient(pipeline, conf); + client.connect(); + return client; + } + }); + } catch (Exception e) { + throw new IOException("Exception getting XceiverClient.", e); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java index a0ad24eb076..a1ef114b2f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReference; * An abstract implementation of {@link XceiverClientSpi} using Ratis. * The underlying RPC mechanism can be chosen via the constructor. */ -public final class XceiverClientRatis implements XceiverClientSpi { +public final class XceiverClientRatis extends XceiverClientSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class); public static XceiverClientRatis newXceiverClientRatis( @@ -58,6 +58,7 @@ public final class XceiverClientRatis implements XceiverClientSpi { /** Constructs a client. */ private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) { + super(); this.pipeline = pipeline; this.rpcType = rpcType; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java index 1cf5a2859c3..b48fed04a59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java @@ -18,24 +18,65 @@ package org.apache.hadoop.scm; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.ozone.protocol.proto + .ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto + .ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; /** * A Client for the storageContainer protocol. */ -public interface XceiverClientSpi extends Closeable { +public abstract class XceiverClientSpi implements Closeable { + + final private AtomicInteger referenceCount; + private boolean isEvicted; + + XceiverClientSpi() { + this.referenceCount = new AtomicInteger(0); + this.isEvicted = false; + } + + void incrementReference() { + this.referenceCount.incrementAndGet(); + } + + void decrementReference() { + this.referenceCount.decrementAndGet(); + cleanup(); + } + + void setEvicted() { + isEvicted = true; + cleanup(); + } + + // close the xceiverClient only if, + // 1) there is no refcount on the client + // 2) it has been evicted from the cache. + private void cleanup() { + if (referenceCount.get() == 0 && isEvicted) { + close(); + } + } + + @VisibleForTesting + public int getRefcount() { + return referenceCount.get(); + } + /** * Connects to the leader in the pipeline. */ - void connect() throws Exception; + public abstract void connect() throws Exception; @Override - void close(); + public abstract void close(); /** * Returns the pipeline of machines that host the container used by this @@ -43,7 +84,7 @@ public interface XceiverClientSpi extends Closeable { * * @return pipeline of machines that host the container */ - Pipeline getPipeline(); + public abstract Pipeline getPipeline(); /** * Sends a given command to server and gets the reply back. @@ -51,6 +92,6 @@ public interface XceiverClientSpi extends Closeable { * @return Response to the command * @throws IOException */ - ContainerCommandResponseProto sendCommand( + public abstract ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java index 73e17ddd159..30817c6dc97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java @@ -169,9 +169,10 @@ public class AsyncBlockWriter { } else { Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); String containerName = pipeline.getContainerName(); + XceiverClientSpi client = null; try { long startTime = Time.monotonicNow(); - XceiverClientSpi client = parentCache.getClientManager() + client = parentCache.getClientManager() .acquireClient(parentCache.getPipeline(block.getBlockID())); // BUG: fix the trace ID. ContainerProtocolCalls.writeSmallFile(client, containerName, @@ -192,6 +193,9 @@ public class AsyncBlockWriter { block.getBlockID(), containerName, ex); throw ex; } finally { + if (client != null) { + parentCache.getClientManager().releaseClient(client); + } block.clearData(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java index ec188ddd63a..e38ca71e284 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java @@ -112,6 +112,7 @@ public class TestBufferManager { // read the list from CBlockServer. So we mimic that action here. pipeline.setData(Longs.toByteArray(x)); containerPipelines.add(pipeline); + xceiverClientManager.releaseClient(client); } return containerPipelines; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java index c54a2140941..eec417a2ded 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java @@ -123,6 +123,7 @@ public class TestLocalBlockCache { // read the list from CBlockServer. So we mimic that action here. pipeline.setData(Longs.toByteArray(x)); containerPipelines.add(pipeline); + xceiverClientManager.releaseClient(client); } return containerPipelines; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index b6576360c5b..deaad877f4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -93,6 +93,7 @@ public class TestContainerSmallFile { traceID); String readData = response.getData().getData().toStringUtf8(); Assert.assertEquals("data123", readData); + xceiverClientManager.releaseClient(client); } @Test @@ -111,6 +112,7 @@ public class TestContainerSmallFile { ContainerProtos.GetSmallFileResponseProto response = ContainerProtocolCalls.readSmallFile(client, containerName, "key", traceID); + xceiverClientManager.releaseClient(client); } @Test @@ -133,6 +135,7 @@ public class TestContainerSmallFile { ContainerProtos.GetSmallFileResponseProto response = ContainerProtocolCalls.readSmallFile(client, invalidName, "key", traceID); + xceiverClientManager.releaseClient(client); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java new file mode 100644 index 00000000000..73003846e20 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.scm; + +import com.google.common.cache.Cache; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; + +import static org.apache.hadoop.scm + .ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY; + +/** + * Test for XceiverClientManager caching and eviction. + */ +public class TestXceiverClientManager { + private static OzoneConfiguration config; + private static MiniOzoneCluster cluster; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void init() throws IOException { + config = new OzoneConfiguration(); + cluster = new MiniOzoneCluster.Builder(config) + .numDataNodes(1) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + storageContainerLocationClient = cluster + .createStorageContainerLocationClient(); + } + + @Test + public void testCaching() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + XceiverClientManager clientManager = new XceiverClientManager(conf); + + String containerName1 = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline1 = + storageContainerLocationClient.allocateContainer(containerName1); + XceiverClientSpi client1 = clientManager.acquireClient(pipeline1); + Assert.assertEquals(client1.getRefcount(), 1); + Assert.assertEquals(containerName1, + client1.getPipeline().getContainerName()); + + String containerName2 = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline2 = + storageContainerLocationClient.allocateContainer(containerName2); + XceiverClientSpi client2 = clientManager.acquireClient(pipeline2); + Assert.assertEquals(client2.getRefcount(), 1); + Assert.assertEquals(containerName2, + client2.getPipeline().getContainerName()); + + XceiverClientSpi client3 = clientManager.acquireClient(pipeline1); + Assert.assertEquals(client3.getRefcount(), 2); + Assert.assertEquals(client1.getRefcount(), 2); + Assert.assertEquals(containerName1, + client3.getPipeline().getContainerName()); + Assert.assertEquals(client1, client3); + clientManager.releaseClient(client1); + clientManager.releaseClient(client2); + clientManager.releaseClient(client3); + } + + @Test + public void testFreeByReference() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 2); + XceiverClientManager clientManager = new XceiverClientManager(conf); + Cache cache = + clientManager.getClientCache(); + + String containerName1 = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline1 = + storageContainerLocationClient.allocateContainer(containerName1); + XceiverClientSpi client1 = clientManager.acquireClient(pipeline1); + Assert.assertEquals(client1.getRefcount(), 1); + Assert.assertEquals(containerName1, + client1.getPipeline().getContainerName()); + + String containerName2 = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline2 = + storageContainerLocationClient.allocateContainer(containerName2); + XceiverClientSpi client2 = clientManager.acquireClient(pipeline2); + Assert.assertEquals(client2.getRefcount(), 1); + Assert.assertEquals(containerName2, + client2.getPipeline().getContainerName()); + Assert.assertNotEquals(client1, client2); + + String containerName3 = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline3 = + storageContainerLocationClient.allocateContainer(containerName3); + XceiverClientSpi client3 = clientManager.acquireClient(pipeline3); + Assert.assertEquals(client3.getRefcount(), 1); + Assert.assertEquals(containerName3, + client3.getPipeline().getContainerName()); + + // least recent container (i.e containerName1) is evicted + XceiverClientSpi nonExistent1 = cache.getIfPresent(containerName1); + Assert.assertEquals(nonExistent1, null); + // However container call should succeed because of refcount on the client. + String traceID1 = "trace" + RandomStringUtils.randomNumeric(4); + ContainerProtocolCalls.createContainer(client1, traceID1); + + // After releasing the client, this connection should be closed + // and any container operations should fail + clientManager.releaseClient(client1); + exception.expect(IOException.class); + exception.expectMessage("This channel is not connected."); + ContainerProtocolCalls.createContainer(client1, traceID1); + clientManager.releaseClient(client2); + clientManager.releaseClient(client3); + } + + @Test + public void testFreeByEviction() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 2); + XceiverClientManager clientManager = new XceiverClientManager(conf); + Cache cache = + clientManager.getClientCache(); + + String containerName1 = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline1 = + storageContainerLocationClient.allocateContainer(containerName1); + XceiverClientSpi client1 = clientManager.acquireClient(pipeline1); + Assert.assertEquals(client1.getRefcount(), 1); + Assert.assertEquals(containerName1, + client1.getPipeline().getContainerName()); + + String containerName2 = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline2 = + storageContainerLocationClient.allocateContainer(containerName2); + XceiverClientSpi client2 = clientManager.acquireClient(pipeline2); + Assert.assertEquals(client2.getRefcount(), 1); + Assert.assertEquals(containerName2, + client2.getPipeline().getContainerName()); + Assert.assertNotEquals(client1, client2); + + clientManager.releaseClient(client1); + Assert.assertEquals(client1.getRefcount(), 0); + + String containerName3 = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline3 = + storageContainerLocationClient.allocateContainer(containerName3); + XceiverClientSpi client3 = clientManager.acquireClient(pipeline3); + Assert.assertEquals(client3.getRefcount(), 1); + Assert.assertEquals(containerName3, + client3.getPipeline().getContainerName()); + + // now client 1 should be evicted + XceiverClientSpi nonExistent = cache.getIfPresent(containerName1); + Assert.assertEquals(nonExistent, null); + + // Any container operation should now fail + String traceID2 = "trace" + RandomStringUtils.randomNumeric(4); + exception.expect(IOException.class); + exception.expectMessage("This channel is not connected."); + ContainerProtocolCalls.createContainer(client1, traceID2); + clientManager.releaseClient(client2); + clientManager.releaseClient(client3); + } +}