HDFS-12246. Ozone: potential thread leaks. Contributed by Weiwei Yang.
This commit is contained in:
parent
23dee0f123
commit
89d8d20c73
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.scm;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -53,7 +54,7 @@ import static org.apache.hadoop.scm.ScmConfigKeys
|
|||
* without reestablishing connection. But the connection will be closed if
|
||||
* not being used for a period of time.
|
||||
*/
|
||||
public class XceiverClientManager {
|
||||
public class XceiverClientManager implements Closeable {
|
||||
|
||||
//TODO : change this to SCM configuration class
|
||||
private final Configuration conf;
|
||||
|
@ -89,6 +90,7 @@ public class XceiverClientManager {
|
|||
// Mark the entry as evicted
|
||||
XceiverClientSpi info = removalNotification.getValue();
|
||||
info.setEvicted();
|
||||
info.close();
|
||||
}
|
||||
}
|
||||
}).build();
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Map;
|
|||
import com.sun.jersey.api.container.ContainerFactory;
|
||||
import com.sun.jersey.api.core.ApplicationAdapter;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ksm.protocolPB
|
||||
.KeySpaceManagerProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||
|
@ -179,11 +180,8 @@ public final class ObjectStoreHandler implements Closeable {
|
|||
public void close() {
|
||||
LOG.info("Closing ObjectStoreHandler.");
|
||||
storageHandler.close();
|
||||
if (this.storageContainerLocationClient != null) {
|
||||
this.storageContainerLocationClient.close();
|
||||
}
|
||||
if (this.scmBlockLocationClient != null) {
|
||||
this.scmBlockLocationClient.close();
|
||||
}
|
||||
IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
|
||||
IOUtils.cleanupWithLogger(LOG, scmBlockLocationClient);
|
||||
IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
@ -562,8 +563,8 @@ public class OzoneClientImpl implements OzoneClient, Closeable {
|
|||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if(xceiverClientManager != null) {
|
||||
xceiverClientManager.close();
|
||||
}
|
||||
IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
|
||||
IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient);
|
||||
IOUtils.cleanupWithLogger(LOG, xceiverClientManager);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,6 +53,8 @@ public class DatanodeStateMachine implements Closeable {
|
|||
private final CommandDispatcher commandDispatcher;
|
||||
private long commandsHandled;
|
||||
private AtomicLong nextHB;
|
||||
private Thread stateMachineThread = null;
|
||||
private Thread cmdProcessThread = null;
|
||||
|
||||
/**
|
||||
* Constructs a a datanode state machine.
|
||||
|
@ -136,6 +138,8 @@ public class DatanodeStateMachine implements Closeable {
|
|||
if (now < nextHB.get()) {
|
||||
Thread.sleep(nextHB.get() - now);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore this exception.
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unable to finish the execution.", e);
|
||||
}
|
||||
|
@ -173,6 +177,12 @@ public class DatanodeStateMachine implements Closeable {
|
|||
*/
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (stateMachineThread != null) {
|
||||
stateMachineThread.interrupt();
|
||||
}
|
||||
if (cmdProcessThread != null) {
|
||||
cmdProcessThread.interrupt();
|
||||
}
|
||||
context.setState(DatanodeStates.getLastState());
|
||||
executorService.shutdown();
|
||||
try {
|
||||
|
@ -189,8 +199,8 @@ public class DatanodeStateMachine implements Closeable {
|
|||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
|
||||
endPoint.close();
|
||||
if (connectionManager != null) {
|
||||
connectionManager.close();
|
||||
}
|
||||
|
||||
if(container != null) {
|
||||
|
@ -275,11 +285,11 @@ public class DatanodeStateMachine implements Closeable {
|
|||
LOG.error("Unable to start the DatanodeState Machine", ex);
|
||||
}
|
||||
};
|
||||
Thread thread = new ThreadFactoryBuilder()
|
||||
stateMachineThread = new ThreadFactoryBuilder()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("Datanode State Machine Thread - %d")
|
||||
.build().newThread(startStateMachineTask);
|
||||
thread.start();
|
||||
stateMachineThread.start();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -344,7 +354,7 @@ public class DatanodeStateMachine implements Closeable {
|
|||
};
|
||||
|
||||
// We will have only one thread for command processing in a datanode.
|
||||
Thread cmdProcessThread = new Thread(processCommandQueue);
|
||||
cmdProcessThread = new Thread(processCommandQueue);
|
||||
cmdProcessThread.setDaemon(true);
|
||||
cmdProcessThread.setName("Command processor thread");
|
||||
cmdProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.hadoop.ozone.container.common.statemachine;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -28,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
|
@ -40,7 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
* SCMConnectionManager - Acts as a class that manages the membership
|
||||
* information of the SCMs that we are working with.
|
||||
*/
|
||||
public class SCMConnectionManager {
|
||||
public class SCMConnectionManager implements Closeable{
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(SCMConnectionManager.class);
|
||||
|
||||
|
@ -132,6 +134,7 @@ public class SCMConnectionManager {
|
|||
|
||||
StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
|
||||
new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
|
||||
|
||||
EndpointStateMachine endPoint =
|
||||
new EndpointStateMachine(address, rpcClient, conf);
|
||||
scmMachines.put(address, endPoint);
|
||||
|
@ -171,4 +174,10 @@ public class SCMConnectionManager {
|
|||
public Collection<EndpointStateMachine> getValues() {
|
||||
return scmMachines.values();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
getValues().forEach(endpointStateMachine
|
||||
-> IOUtils.cleanupWithLogger(LOG, endpointStateMachine));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,6 +97,9 @@ public final class XceiverServer implements XceiverServerSpi {
|
|||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (storageContainer != null) {
|
||||
storageContainer.shutdown();
|
||||
}
|
||||
if (bossGroup != null) {
|
||||
bossGroup.shutdownGracefully();
|
||||
}
|
||||
|
|
|
@ -21,10 +21,10 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
|
@ -610,8 +610,9 @@ public class StorageContainerManager
|
|||
}
|
||||
|
||||
unregisterMXBean();
|
||||
IOUtils.closeQuietly(scmContainerManager);
|
||||
IOUtils.closeQuietly(scmBlockManager);
|
||||
IOUtils.cleanupWithLogger(LOG, scmContainerManager);
|
||||
IOUtils.cleanupWithLogger(LOG, scmBlockManager);
|
||||
IOUtils.cleanupWithLogger(LOG, scmNodeManager);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.web.storage;
|
|||
import com.google.common.base.Strings;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset
|
||||
.LengthInputStream;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||
|
@ -496,8 +497,8 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
if(xceiverClientManager != null) {
|
||||
xceiverClientManager.close();
|
||||
}
|
||||
IOUtils.cleanupWithLogger(LOG, xceiverClientManager);
|
||||
IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient);
|
||||
IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue