HDFS-11887. Shared XceiverClient should be closed if there is no open clients to avoid resource leak. Contributed by Mukul Kumar Singh.

This commit is contained in:
Weiwei Yang 2017-06-06 21:20:20 +08:00 committed by Owen O'Malley
parent f5d17b8f7c
commit d6dd557b24
10 changed files with 324 additions and 97 deletions

View File

@ -35,6 +35,11 @@ public final class ScmConfigKeys {
public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
10000;
public static final String SCM_CONTAINER_CLIENT_MAX_SIZE_KEY =
"scm.container.client.max.size";
public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT =
256;
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

View File

@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
/**
* A Client for the storageContainer protocol.
*/
public class XceiverClient implements XceiverClientSpi {
public class XceiverClient extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
private final Pipeline pipeline;
private final Configuration config;
@ -55,6 +55,7 @@ public class XceiverClient implements XceiverClientSpi {
* @param config -- Ozone Config
*/
public XceiverClient(Pipeline pipeline, Configuration config) {
super();
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config);
this.pipeline = pipeline;

View File

@ -20,8 +20,9 @@ package org.apache.hadoop.scm;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Callable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
@ -31,8 +32,14 @@ import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT;
/**
* XceiverClientManager is responsible for the lifecycle of XceiverClient
@ -50,8 +57,7 @@ public class XceiverClientManager {
//TODO : change this to SCM configuration class
private final Configuration conf;
private Cache<String, XceiverClientWithAccessInfo> openClient;
private final long staleThresholdMs;
private final Cache<String, XceiverClientSpi> clientCache;
private final boolean useRatis;
/**
@ -61,121 +67,91 @@ public class XceiverClientManager {
*/
public XceiverClientManager(Configuration conf) {
Preconditions.checkNotNull(conf);
this.staleThresholdMs = conf.getTimeDuration(
int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT);
long staleThresholdMs = conf.getTimeDuration(
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
this.useRatis = conf.getBoolean(
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
this.conf = conf;
this.openClient = CacheBuilder.newBuilder()
.expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS)
this.clientCache = CacheBuilder.newBuilder()
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
.maximumSize(maxSize)
.removalListener(
new RemovalListener<String, XceiverClientWithAccessInfo>() {
new RemovalListener<String, XceiverClientSpi>() {
@Override
public void onRemoval(
RemovalNotification<String, XceiverClientWithAccessInfo>
RemovalNotification<String, XceiverClientSpi>
removalNotification) {
// If the reference count is not 0, this xceiver client should not
// be evicted, add it back to the cache.
XceiverClientWithAccessInfo info = removalNotification.getValue();
if (info.hasRefence()) {
synchronized (XceiverClientManager.this.openClient) {
XceiverClientManager.this
.openClient.put(removalNotification.getKey(), info);
}
synchronized (clientCache) {
// Mark the entry as evicted
XceiverClientSpi info = removalNotification.getValue();
info.setEvicted();
}
}
}).build();
}
@VisibleForTesting
public Cache<String, XceiverClientSpi> getClientCache() {
return clientCache;
}
/**
* Acquires a XceiverClient connected to a container capable of storing the
* specified key.
* Acquires a SharedXceiverClient connected to a container capable of
* storing the specified key.
*
* If there is already a cached XceiverClient, simply return the cached
* otherwise create a new one.
* If there is already a cached SharedXceiverClient, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired
* @return SharedXceiverClient connected to a container
* @throws IOException if an SharedXceiverClient cannot be acquired
*/
public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException {
public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
String containerName = pipeline.getContainerName();
synchronized(openClient) {
XceiverClientWithAccessInfo info =
openClient.getIfPresent(containerName);
if (info != null) {
// we do have this connection, add reference and return
info.incrementReference();
return info.getXceiverClient();
} else {
// connection not found, create new, add reference and return
final XceiverClientSpi xceiverClient = useRatis ?
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
: new XceiverClient(pipeline, conf);
try {
xceiverClient.connect();
} catch (Exception e) {
throw new IOException("Exception connecting XceiverClient.", e);
}
info = new XceiverClientWithAccessInfo(xceiverClient);
info.incrementReference();
openClient.put(containerName, info);
return xceiverClient;
}
synchronized (clientCache) {
XceiverClientSpi info = getClient(pipeline);
info.incrementReference();
return info;
}
}
/**
* Releases an XceiverClient after use.
* Releases an SharedXceiverClient after use.
*
* @param xceiverClient client to release
* @param client client to release
*/
public void releaseClient(XceiverClientSpi xceiverClient) {
Preconditions.checkNotNull(xceiverClient);
String containerName = xceiverClient.getPipeline().getContainerName();
XceiverClientWithAccessInfo info;
synchronized (openClient) {
info = openClient.getIfPresent(containerName);
Preconditions.checkNotNull(info);
info.decrementReference();
public void releaseClient(XceiverClientSpi client) {
Preconditions.checkNotNull(client);
synchronized (clientCache) {
client.decrementReference();
}
}
/**
* A helper class for caching and cleaning XceiverClient. Three parameters:
* - the actual XceiverClient object
* - a time stamp representing the most recent access (acquire or release)
* - a reference count, +1 when acquire, -1 when release
*/
private static class XceiverClientWithAccessInfo {
final private XceiverClientSpi xceiverClient;
final private AtomicInteger referenceCount;
XceiverClientWithAccessInfo(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient;
this.referenceCount = new AtomicInteger(0);
}
void incrementReference() {
this.referenceCount.incrementAndGet();
}
void decrementReference() {
this.referenceCount.decrementAndGet();
}
boolean hasRefence() {
return this.referenceCount.get() != 0;
}
XceiverClientSpi getXceiverClient() {
return xceiverClient;
private XceiverClientSpi getClient(Pipeline pipeline)
throws IOException {
String containerName = pipeline.getContainerName();
try {
return clientCache.get(containerName,
new Callable<XceiverClientSpi>() {
@Override
public XceiverClientSpi call() throws Exception {
XceiverClientSpi client = useRatis ?
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
: new XceiverClient(pipeline, conf);
client.connect();
return client;
}
});
} catch (Exception e) {
throw new IOException("Exception getting XceiverClient.", e);
}
}
}

View File

@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
* An abstract implementation of {@link XceiverClientSpi} using Ratis.
* The underlying RPC mechanism can be chosen via the constructor.
*/
public final class XceiverClientRatis implements XceiverClientSpi {
public final class XceiverClientRatis extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
public static XceiverClientRatis newXceiverClientRatis(
@ -58,6 +58,7 @@ public final class XceiverClientRatis implements XceiverClientSpi {
/** Constructs a client. */
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
}

View File

@ -18,24 +18,65 @@
package org.apache.hadoop.scm;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A Client for the storageContainer protocol.
*/
public interface XceiverClientSpi extends Closeable {
public abstract class XceiverClientSpi implements Closeable {
final private AtomicInteger referenceCount;
private boolean isEvicted;
XceiverClientSpi() {
this.referenceCount = new AtomicInteger(0);
this.isEvicted = false;
}
void incrementReference() {
this.referenceCount.incrementAndGet();
}
void decrementReference() {
this.referenceCount.decrementAndGet();
cleanup();
}
void setEvicted() {
isEvicted = true;
cleanup();
}
// close the xceiverClient only if,
// 1) there is no refcount on the client
// 2) it has been evicted from the cache.
private void cleanup() {
if (referenceCount.get() == 0 && isEvicted) {
close();
}
}
@VisibleForTesting
public int getRefcount() {
return referenceCount.get();
}
/**
* Connects to the leader in the pipeline.
*/
void connect() throws Exception;
public abstract void connect() throws Exception;
@Override
void close();
public abstract void close();
/**
* Returns the pipeline of machines that host the container used by this
@ -43,7 +84,7 @@ public interface XceiverClientSpi extends Closeable {
*
* @return pipeline of machines that host the container
*/
Pipeline getPipeline();
public abstract Pipeline getPipeline();
/**
* Sends a given command to server and gets the reply back.
@ -51,6 +92,6 @@ public interface XceiverClientSpi extends Closeable {
* @return Response to the command
* @throws IOException
*/
ContainerCommandResponseProto sendCommand(
public abstract ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException;
}

View File

@ -169,9 +169,10 @@ public class AsyncBlockWriter {
} else {
Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
String containerName = pipeline.getContainerName();
XceiverClientSpi client = null;
try {
long startTime = Time.monotonicNow();
XceiverClientSpi client = parentCache.getClientManager()
client = parentCache.getClientManager()
.acquireClient(parentCache.getPipeline(block.getBlockID()));
// BUG: fix the trace ID.
ContainerProtocolCalls.writeSmallFile(client, containerName,
@ -192,6 +193,9 @@ public class AsyncBlockWriter {
block.getBlockID(), containerName, ex);
throw ex;
} finally {
if (client != null) {
parentCache.getClientManager().releaseClient(client);
}
block.clearData();
}
}

View File

@ -112,6 +112,7 @@ public class TestBufferManager {
// read the list from CBlockServer. So we mimic that action here.
pipeline.setData(Longs.toByteArray(x));
containerPipelines.add(pipeline);
xceiverClientManager.releaseClient(client);
}
return containerPipelines;
}

View File

@ -123,6 +123,7 @@ public class TestLocalBlockCache {
// read the list from CBlockServer. So we mimic that action here.
pipeline.setData(Longs.toByteArray(x));
containerPipelines.add(pipeline);
xceiverClientManager.releaseClient(client);
}
return containerPipelines;
}

View File

@ -93,6 +93,7 @@ public class TestContainerSmallFile {
traceID);
String readData = response.getData().getData().toStringUtf8();
Assert.assertEquals("data123", readData);
xceiverClientManager.releaseClient(client);
}
@Test
@ -111,6 +112,7 @@ public class TestContainerSmallFile {
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, containerName, "key",
traceID);
xceiverClientManager.releaseClient(client);
}
@Test
@ -133,6 +135,7 @@ public class TestContainerSmallFile {
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, invalidName, "key",
traceID);
xceiverClientManager.releaseClient(client);
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import static org.apache.hadoop.scm
.ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
/**
* Test for XceiverClientManager caching and eviction.
*/
public class TestXceiverClientManager {
private static OzoneConfiguration config;
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@Rule
public ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void init() throws IOException {
config = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(config)
.numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageContainerLocationClient = cluster
.createStorageContainerLocationClient();
}
@Test
public void testCaching() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
XceiverClientManager clientManager = new XceiverClientManager(conf);
String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(containerName1);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(client1.getRefcount(), 1);
Assert.assertEquals(containerName1,
client1.getPipeline().getContainerName());
String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(containerName2);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(client2.getRefcount(), 1);
Assert.assertEquals(containerName2,
client2.getPipeline().getContainerName());
XceiverClientSpi client3 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(client3.getRefcount(), 2);
Assert.assertEquals(client1.getRefcount(), 2);
Assert.assertEquals(containerName1,
client3.getPipeline().getContainerName());
Assert.assertEquals(client1, client3);
clientManager.releaseClient(client1);
clientManager.releaseClient(client2);
clientManager.releaseClient(client3);
}
@Test
public void testFreeByReference() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 2);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache();
String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(containerName1);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(client1.getRefcount(), 1);
Assert.assertEquals(containerName1,
client1.getPipeline().getContainerName());
String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(containerName2);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(client2.getRefcount(), 1);
Assert.assertEquals(containerName2,
client2.getPipeline().getContainerName());
Assert.assertNotEquals(client1, client2);
String containerName3 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline3 =
storageContainerLocationClient.allocateContainer(containerName3);
XceiverClientSpi client3 = clientManager.acquireClient(pipeline3);
Assert.assertEquals(client3.getRefcount(), 1);
Assert.assertEquals(containerName3,
client3.getPipeline().getContainerName());
// least recent container (i.e containerName1) is evicted
XceiverClientSpi nonExistent1 = cache.getIfPresent(containerName1);
Assert.assertEquals(nonExistent1, null);
// However container call should succeed because of refcount on the client.
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
ContainerProtocolCalls.createContainer(client1, traceID1);
// After releasing the client, this connection should be closed
// and any container operations should fail
clientManager.releaseClient(client1);
exception.expect(IOException.class);
exception.expectMessage("This channel is not connected.");
ContainerProtocolCalls.createContainer(client1, traceID1);
clientManager.releaseClient(client2);
clientManager.releaseClient(client3);
}
@Test
public void testFreeByEviction() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 2);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache();
String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline1 =
storageContainerLocationClient.allocateContainer(containerName1);
XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
Assert.assertEquals(client1.getRefcount(), 1);
Assert.assertEquals(containerName1,
client1.getPipeline().getContainerName());
String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline2 =
storageContainerLocationClient.allocateContainer(containerName2);
XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
Assert.assertEquals(client2.getRefcount(), 1);
Assert.assertEquals(containerName2,
client2.getPipeline().getContainerName());
Assert.assertNotEquals(client1, client2);
clientManager.releaseClient(client1);
Assert.assertEquals(client1.getRefcount(), 0);
String containerName3 = "container" + RandomStringUtils.randomNumeric(10);
Pipeline pipeline3 =
storageContainerLocationClient.allocateContainer(containerName3);
XceiverClientSpi client3 = clientManager.acquireClient(pipeline3);
Assert.assertEquals(client3.getRefcount(), 1);
Assert.assertEquals(containerName3,
client3.getPipeline().getContainerName());
// now client 1 should be evicted
XceiverClientSpi nonExistent = cache.getIfPresent(containerName1);
Assert.assertEquals(nonExistent, null);
// Any container operation should now fail
String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
exception.expect(IOException.class);
exception.expectMessage("This channel is not connected.");
ContainerProtocolCalls.createContainer(client1, traceID2);
clientManager.releaseClient(client2);
clientManager.releaseClient(client3);
}
}