YARN-293. Node Manager leaks LocalizerRunner object for every Container. Contributed by Robert Joseph Evans
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1428095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1f41061132
commit
16cf201042
|
@ -251,6 +251,9 @@ Release 0.23.6 - UNRELEASED
|
||||||
YARN-225. Proxy Link in RM UI thows NPE in Secure mode
|
YARN-225. Proxy Link in RM UI thows NPE in Secure mode
|
||||||
(Devaraj K via bobby)
|
(Devaraj K via bobby)
|
||||||
|
|
||||||
|
YARN-293. Node Manager leaks LocalizerRunner object for every Container
|
||||||
|
(Robert Joseph Evans via jlowe)
|
||||||
|
|
||||||
Release 0.23.5 - UNRELEASED
|
Release 0.23.5 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -17,20 +17,27 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||||
|
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -41,36 +48,25 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.Server;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -94,8 +90,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
|
||||||
|
@ -372,6 +368,8 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
|
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
String locId = ConverterUtils.toString(c.getContainerID());
|
||||||
|
localizerTracker.cleanupPrivLocalizers(locId);
|
||||||
|
|
||||||
// Delete the container directories
|
// Delete the container directories
|
||||||
String userName = c.getUser();
|
String userName = c.getUser();
|
||||||
|
@ -520,9 +518,8 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
synchronized (privLocalizers) {
|
synchronized (privLocalizers) {
|
||||||
LocalizerRunner localizer = privLocalizers.get(locId);
|
LocalizerRunner localizer = privLocalizers.get(locId);
|
||||||
if (null == localizer) {
|
if (null == localizer) {
|
||||||
LOG.info("Created localizer for " + req.getLocalizerId());
|
LOG.info("Created localizer for " + locId);
|
||||||
localizer = new LocalizerRunner(req.getContext(),
|
localizer = new LocalizerRunner(req.getContext(), locId);
|
||||||
req.getLocalizerId());
|
|
||||||
privLocalizers.put(locId, localizer);
|
privLocalizers.put(locId, localizer);
|
||||||
localizer.start();
|
localizer.start();
|
||||||
}
|
}
|
||||||
|
@ -532,22 +529,22 @@ public class ResourceLocalizationService extends CompositeService
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case ABORT_LOCALIZATION:
|
|
||||||
// 0) find running localizer, interrupt and remove
|
|
||||||
synchronized (privLocalizers) {
|
|
||||||
LocalizerRunner localizer = privLocalizers.get(locId);
|
|
||||||
if (null == localizer) {
|
|
||||||
return; // ignore; already gone
|
|
||||||
}
|
|
||||||
privLocalizers.remove(locId);
|
|
||||||
localizer.interrupt();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void cleanupPrivLocalizers(String locId) {
|
||||||
|
synchronized (privLocalizers) {
|
||||||
|
LocalizerRunner localizer = privLocalizers.get(locId);
|
||||||
|
if (null == localizer) {
|
||||||
|
return; // ignore; already gone
|
||||||
|
}
|
||||||
|
privLocalizers.remove(locId);
|
||||||
|
localizer.interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static ExecutorService createLocalizerExecutor(Configuration conf) {
|
private static ExecutorService createLocalizerExecutor(Configuration conf) {
|
||||||
int nThreads = conf.getInt(
|
int nThreads = conf.getInt(
|
||||||
YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT,
|
YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT,
|
||||||
|
|
|
@ -19,6 +19,5 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.eve
|
||||||
|
|
||||||
public enum LocalizerEventType {
|
public enum LocalizerEventType {
|
||||||
/** See {@link LocalizerResourceRequestEvent} */
|
/** See {@link LocalizerResourceRequestEvent} */
|
||||||
REQUEST_RESOURCE_LOCALIZATION,
|
REQUEST_RESOURCE_LOCALIZATION
|
||||||
ABORT_LOCALIZATION
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -328,6 +328,8 @@ public class TestResourceLocalizationService {
|
||||||
|
|
||||||
//Send Cleanup Event
|
//Send Cleanup Event
|
||||||
spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
|
spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
|
||||||
|
verify(mockLocallilzerTracker)
|
||||||
|
.cleanupPrivLocalizers("container_314159265358979_0003_01_000042");
|
||||||
req2.remove(LocalResourceVisibility.PRIVATE);
|
req2.remove(LocalResourceVisibility.PRIVATE);
|
||||||
spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
|
spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
Loading…
Reference in New Issue