MAPREDUCE-3161. svn merge -c r1181622 --ignore-ancestry ../../trunk/
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1181623 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8d0daf8843
commit
ddaf7c8a7f
|
@ -325,6 +325,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-2988. Reenabled TestLinuxContainerExecutor reflecting the
|
||||
current NodeManager code. (Robert Joseph Evans via vinodkv)
|
||||
|
||||
MAPREDUCE-3161. Improved some javadocs and fixed some typos in
|
||||
YARN. (Todd Lipcon via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
||||
|
|
|
@ -135,9 +135,9 @@ public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase
|
|||
|
||||
lambda
|
||||
= conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
|
||||
MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMNOOTH_LAMBDA_MS);
|
||||
MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
|
||||
smoothedValue
|
||||
= conf.getBoolean(MRJobConfig.MR_AM_TASK_EXTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
|
||||
= conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
|
||||
? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -384,11 +384,11 @@ public interface MRJobConfig {
|
|||
MR_AM_PREFIX
|
||||
+ "job.task.estimator.exponential.smooth.lambda-ms";
|
||||
|
||||
public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMNOOTH_LAMBDA_MS =
|
||||
public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS =
|
||||
1000L * 60;
|
||||
|
||||
/** true if the smoothing rate should be exponential.*/
|
||||
public static final String MR_AM_TASK_EXTIMATOR_EXPONENTIAL_RATE_ENABLE =
|
||||
public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
|
||||
MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
|
||||
|
||||
/** The number of threads used to handle task RPC calls.*/
|
||||
|
|
|
@ -104,7 +104,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
private Server server;
|
||||
private final ResourceLocalizationService rsrcLocalizationSrvc;
|
||||
private final ContainersLauncher containersLauncher;
|
||||
private final AuxServices auxiluaryServices;
|
||||
private final AuxServices auxiliaryServices;
|
||||
private final NodeManagerMetrics metrics;
|
||||
|
||||
private final NodeStatusUpdater nodeStatusUpdater;
|
||||
|
@ -137,9 +137,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
|
||||
// Start configurable services
|
||||
auxiluaryServices = new AuxServices();
|
||||
auxiluaryServices.register(this);
|
||||
addService(auxiluaryServices);
|
||||
auxiliaryServices = new AuxServices();
|
||||
auxiliaryServices.register(this);
|
||||
addService(auxiliaryServices);
|
||||
|
||||
this.containersMonitor =
|
||||
new ContainersMonitorImpl(exec, dispatcher, this.context);
|
||||
|
@ -154,7 +154,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
dispatcher.register(ApplicationEventType.class,
|
||||
new ApplicationEventDispatcher());
|
||||
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
|
||||
dispatcher.register(AuxServicesEventType.class, auxiluaryServices);
|
||||
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
|
||||
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
|
||||
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
|
||||
dispatcher.register(LogAggregatorEventType.class, logAggregationService);
|
||||
|
@ -213,8 +213,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (auxiluaryServices.getServiceState() == STARTED) {
|
||||
auxiluaryServices.unregister(this);
|
||||
if (auxiliaryServices.getServiceState() == STARTED) {
|
||||
auxiliaryServices.unregister(this);
|
||||
}
|
||||
if (server != null) {
|
||||
server.close();
|
||||
|
@ -285,7 +285,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
StartContainerResponse response =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
response.addAllServiceResponse(auxiluaryServices.getMeta());
|
||||
response.addAllServiceResponse(auxiliaryServices.getMeta());
|
||||
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
|
||||
// launch. A finished Application will not launch containers.
|
||||
metrics.launchedContainer();
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
|
||||
|
@ -43,6 +44,10 @@ import org.apache.hadoop.yarn.state.StateMachine;
|
|||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
/**
|
||||
* The state machine for the representation of an Application
|
||||
* within the NodeManager.
|
||||
*/
|
||||
public class ApplicationImpl implements Application {
|
||||
|
||||
final Dispatcher dispatcher;
|
||||
|
@ -151,6 +156,9 @@ public class ApplicationImpl implements Application {
|
|||
|
||||
/**
|
||||
* Notify services of new application.
|
||||
*
|
||||
* In particular, this requests that the {@link ResourceLocalizationService}
|
||||
* localize the application-scoped resources.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
static class AppInitTransition implements
|
||||
|
|
|
@ -431,6 +431,20 @@ public class ContainerImpl implements Container {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* State transition when a NEW container receives the INIT_CONTAINER
|
||||
* message.
|
||||
*
|
||||
* If there are resources to localize, sends a
|
||||
* ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES)
|
||||
* to the ResourceLocalizationManager and enters LOCALIZING state.
|
||||
*
|
||||
* If there are no resources to localize, sends LAUNCH_CONTAINER event
|
||||
* and enters LOCALIZED state directly.
|
||||
*
|
||||
* If there are any invalid resources specified, enters LOCALIZATION_FAILED
|
||||
* directly.
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class RequestResourcesTransition implements
|
||||
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
|
||||
|
@ -513,6 +527,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition when one of the requested resources for this container
|
||||
* has been successfully localized.
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class LocalizedTransition implements
|
||||
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
|
||||
|
@ -540,6 +558,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition from LOCALIZED state to RUNNING state upon receiving
|
||||
* a CONTAINER_LAUNCHED event
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class LaunchTransition extends ContainerTransition {
|
||||
@Override
|
||||
|
@ -556,6 +578,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
|
||||
* upon EXITED_WITH_SUCCESS message.
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class ExitedWithSuccessTransition extends ContainerTransition {
|
||||
|
||||
|
@ -582,6 +608,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition to EXITED_WITH_FAILURE state upon
|
||||
* CONTAINER_EXITED_WITH_FAILURE state.
|
||||
**/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class ExitedWithFailureTransition extends ContainerTransition {
|
||||
|
||||
|
@ -609,6 +639,9 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
|
||||
*/
|
||||
static class KilledExternallyTransition extends ExitedWithFailureTransition {
|
||||
KilledExternallyTransition() {
|
||||
super(true);
|
||||
|
@ -621,6 +654,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
|
||||
* RESOURCE_FAILED event.
|
||||
*/
|
||||
static class ResourceFailedTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
|
@ -638,7 +675,11 @@ public class ContainerImpl implements Container {
|
|||
container.metrics.endInitingContainer();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Transition from LOCALIZING to KILLING upon receiving
|
||||
* KILL_CONTAINER event.
|
||||
*/
|
||||
static class KillDuringLocalizationTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
|
@ -652,6 +693,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remain in KILLING state when receiving a RESOURCE_LOCALIZED request
|
||||
* while in the process of killing.
|
||||
*/
|
||||
static class LocalizedResourceDuringKillTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
|
@ -669,6 +714,11 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions upon receiving KILL_CONTAINER:
|
||||
* - LOCALIZED -> KILLING
|
||||
* - RUNNING -> KILLING
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class KillTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
|
@ -683,6 +733,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
|
||||
* upon receiving CONTAINER_KILLED_ON_REQUEST.
|
||||
*/
|
||||
static class ContainerKilledTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
|
@ -696,6 +750,13 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the following transitions:
|
||||
* - NEW -> DONE upon KILL_CONTAINER
|
||||
* - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
|
||||
* KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
|
||||
* -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
|
||||
*/
|
||||
static class ContainerDoneTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
|
@ -703,7 +764,10 @@ public class ContainerImpl implements Container {
|
|||
container.finished();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Update diagnostics, staying in the same state.
|
||||
*/
|
||||
static class ContainerDiagnosticsUpdateTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
|
|
|
@ -112,7 +112,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
|
||||
|
||||
/**
|
||||
* Returns the path upto the random directory component.
|
||||
* Returns the path up to the random directory component.
|
||||
*/
|
||||
private Path getPathToDelete(Path localPath) {
|
||||
Path delPath = localPath.getParent();
|
||||
|
@ -121,7 +121,7 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
|
|||
if (matcher.matches()) {
|
||||
return delPath;
|
||||
} else {
|
||||
LOG.warn("Random directroy component did not match. " +
|
||||
LOG.warn("Random directory component did not match. " +
|
||||
"Deleting localized path only");
|
||||
return localPath;
|
||||
}
|
||||
|
|
|
@ -133,8 +133,18 @@ public class ResourceLocalizationService extends CompositeService
|
|||
private final ScheduledExecutorService cacheCleanup;
|
||||
|
||||
private final LocalResourcesTracker publicRsrc;
|
||||
|
||||
/**
|
||||
* Map of LocalResourceTrackers keyed by username, for private
|
||||
* resources.
|
||||
*/
|
||||
private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
|
||||
new ConcurrentHashMap<String,LocalResourcesTracker>();
|
||||
|
||||
/**
|
||||
* Map of LocalResourceTrackers keyed by appid, for application
|
||||
* resources.
|
||||
*/
|
||||
private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
|
||||
new ConcurrentHashMap<String,LocalResourcesTracker>();
|
||||
|
||||
|
@ -251,140 +261,167 @@ public class ResourceLocalizationService extends CompositeService
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
public void handle(LocalizationEvent event) {
|
||||
String userName;
|
||||
String appIDStr;
|
||||
Container c;
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
|
||||
LocalResourcesTracker tracker;
|
||||
// TODO: create log dir as $logdir/$user/$appId
|
||||
switch (event.getType()) {
|
||||
case INIT_APPLICATION_RESOURCES:
|
||||
Application app =
|
||||
((ApplicationLocalizationEvent)event).getApplication();
|
||||
// 0) Create application tracking structs
|
||||
userName = app.getUser();
|
||||
privateRsrc.putIfAbsent(userName,
|
||||
new LocalResourcesTrackerImpl(userName, dispatcher));
|
||||
if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
|
||||
new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
|
||||
LOG.warn("Initializing application " + app + " already present");
|
||||
assert false; // TODO: FIXME assert doesn't help
|
||||
// ^ The condition is benign. Tests should fail and it
|
||||
// should appear in logs, but it's an internal error
|
||||
// that should have no effect on applications
|
||||
}
|
||||
// 1) Signal container init
|
||||
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
|
||||
app.getAppId()));
|
||||
handleInitApplicationResources(
|
||||
((ApplicationLocalizationEvent)event).getApplication());
|
||||
break;
|
||||
case INIT_CONTAINER_RESOURCES:
|
||||
ContainerLocalizationRequestEvent rsrcReqs =
|
||||
(ContainerLocalizationRequestEvent) event;
|
||||
c = rsrcReqs.getContainer();
|
||||
LocalizerContext ctxt = new LocalizerContext(
|
||||
c.getUser(), c.getContainerID(), c.getCredentials());
|
||||
rsrcs = rsrcReqs.getRequestedResources();
|
||||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
|
||||
}
|
||||
}
|
||||
handleInitContainerResources((ContainerLocalizationRequestEvent) event);
|
||||
break;
|
||||
case CACHE_CLEANUP:
|
||||
ResourceRetentionSet retain =
|
||||
new ResourceRetentionSet(delService, cacheTargetSize);
|
||||
retain.addResources(publicRsrc);
|
||||
LOG.debug("Resource cleanup (public) " + retain);
|
||||
for (LocalResourcesTracker t : privateRsrc.values()) {
|
||||
retain.addResources(t);
|
||||
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
|
||||
}
|
||||
//TODO Check if appRsrcs should also be added to the retention set.
|
||||
handleCacheCleanup(event);
|
||||
break;
|
||||
case CLEANUP_CONTAINER_RESOURCES:
|
||||
ContainerLocalizationCleanupEvent rsrcCleanup =
|
||||
(ContainerLocalizationCleanupEvent) event;
|
||||
c = rsrcCleanup.getContainer();
|
||||
rsrcs = rsrcCleanup.getResources();
|
||||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the container directories
|
||||
userName = c.getUser();
|
||||
String containerIDStr = c.toString();
|
||||
appIDStr =
|
||||
ConverterUtils.toString(
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
for (Path localDir : localDirs) {
|
||||
|
||||
// Delete the user-owned container-dir
|
||||
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
||||
Path userdir = new Path(usersdir, userName);
|
||||
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
||||
Path appDir = new Path(allAppsdir, appIDStr);
|
||||
Path containerDir = new Path(appDir, containerIDStr);
|
||||
delService.delete(userName, containerDir, new Path[] {});
|
||||
|
||||
// Delete the nmPrivate container-dir
|
||||
|
||||
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
|
||||
Path appSysDir = new Path(sysDir, appIDStr);
|
||||
Path containerSysDir = new Path(appSysDir, containerIDStr);
|
||||
delService.delete(null, containerSysDir, new Path[] {});
|
||||
}
|
||||
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
|
||||
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
|
||||
handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
|
||||
break;
|
||||
case DESTROY_APPLICATION_RESOURCES:
|
||||
|
||||
Application application =
|
||||
((ApplicationLocalizationEvent) event).getApplication();
|
||||
LocalResourcesTracker appLocalRsrcsTracker =
|
||||
appRsrc.remove(ConverterUtils.toString(application.getAppId()));
|
||||
if (null == appLocalRsrcsTracker) {
|
||||
LOG.warn("Removing uninitialized application " + application);
|
||||
}
|
||||
// TODO: What to do with appLocalRsrcsTracker?
|
||||
|
||||
// Delete the application directories
|
||||
userName = application.getUser();
|
||||
appIDStr = application.toString();
|
||||
for (Path localDir : localDirs) {
|
||||
|
||||
// Delete the user-owned app-dir
|
||||
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
||||
Path userdir = new Path(usersdir, userName);
|
||||
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
||||
Path appDir = new Path(allAppsdir, appIDStr);
|
||||
delService.delete(userName, appDir, new Path[] {});
|
||||
|
||||
// Delete the nmPrivate app-dir
|
||||
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
|
||||
Path appSysDir = new Path(sysDir, appIDStr);
|
||||
delService.delete(null, appSysDir, new Path[] {});
|
||||
}
|
||||
|
||||
// TODO: decrement reference counts of all resources associated with this
|
||||
// app
|
||||
|
||||
dispatcher.getEventHandler().handle(new ApplicationEvent(
|
||||
application.getAppId(),
|
||||
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
|
||||
handleDestroyApplicationResources(
|
||||
((ApplicationLocalizationEvent)event).getApplication());
|
||||
break;
|
||||
default:
|
||||
throw new YarnException("Unknown localization event: " + event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle event received the first time any container is scheduled
|
||||
* by a given application.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleInitApplicationResources(Application app) {
|
||||
// 0) Create application tracking structs
|
||||
String userName = app.getUser();
|
||||
privateRsrc.putIfAbsent(userName,
|
||||
new LocalResourcesTrackerImpl(userName, dispatcher));
|
||||
if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
|
||||
new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
|
||||
LOG.warn("Initializing application " + app + " already present");
|
||||
assert false; // TODO: FIXME assert doesn't help
|
||||
// ^ The condition is benign. Tests should fail and it
|
||||
// should appear in logs, but it's an internal error
|
||||
// that should have no effect on applications
|
||||
}
|
||||
// 1) Signal container init
|
||||
//
|
||||
// This is handled by the ApplicationImpl state machine and allows
|
||||
// containers to proceed with launching.
|
||||
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
|
||||
app.getAppId()));
|
||||
}
|
||||
|
||||
private void handleInitContainerResources(
|
||||
ContainerLocalizationRequestEvent rsrcReqs) {
|
||||
Container c = rsrcReqs.getContainer();
|
||||
LocalizerContext ctxt = new LocalizerContext(
|
||||
c.getUser(), c.getContainerID(), c.getCredentials());
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
||||
rsrcReqs.getRequestedResources();
|
||||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleCacheCleanup(LocalizationEvent event) {
|
||||
ResourceRetentionSet retain =
|
||||
new ResourceRetentionSet(delService, cacheTargetSize);
|
||||
retain.addResources(publicRsrc);
|
||||
LOG.debug("Resource cleanup (public) " + retain);
|
||||
for (LocalResourcesTracker t : privateRsrc.values()) {
|
||||
retain.addResources(t);
|
||||
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
|
||||
}
|
||||
//TODO Check if appRsrcs should also be added to the retention set.
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleCleanupContainerResources(
|
||||
ContainerLocalizationCleanupEvent rsrcCleanup) {
|
||||
Container c = rsrcCleanup.getContainer();
|
||||
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
|
||||
rsrcCleanup.getResources();
|
||||
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
|
||||
rsrcs.entrySet()) {
|
||||
LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
for (LocalResourceRequest req : e.getValue()) {
|
||||
tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the container directories
|
||||
String userName = c.getUser();
|
||||
String containerIDStr = c.toString();
|
||||
String appIDStr = ConverterUtils.toString(
|
||||
c.getContainerID().getApplicationAttemptId().getApplicationId());
|
||||
for (Path localDir : localDirs) {
|
||||
|
||||
// Delete the user-owned container-dir
|
||||
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
||||
Path userdir = new Path(usersdir, userName);
|
||||
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
||||
Path appDir = new Path(allAppsdir, appIDStr);
|
||||
Path containerDir = new Path(appDir, containerIDStr);
|
||||
delService.delete(userName, containerDir, new Path[] {});
|
||||
|
||||
// Delete the nmPrivate container-dir
|
||||
|
||||
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
|
||||
Path appSysDir = new Path(sysDir, appIDStr);
|
||||
Path containerSysDir = new Path(appSysDir, containerIDStr);
|
||||
delService.delete(null, containerSysDir, new Path[] {});
|
||||
}
|
||||
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
|
||||
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private void handleDestroyApplicationResources(Application application) {
|
||||
String userName;
|
||||
String appIDStr;
|
||||
LocalResourcesTracker appLocalRsrcsTracker =
|
||||
appRsrc.remove(ConverterUtils.toString(application.getAppId()));
|
||||
if (null == appLocalRsrcsTracker) {
|
||||
LOG.warn("Removing uninitialized application " + application);
|
||||
}
|
||||
// TODO: What to do with appLocalRsrcsTracker?
|
||||
|
||||
// Delete the application directories
|
||||
userName = application.getUser();
|
||||
appIDStr = application.toString();
|
||||
for (Path localDir : localDirs) {
|
||||
|
||||
// Delete the user-owned app-dir
|
||||
Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
|
||||
Path userdir = new Path(usersdir, userName);
|
||||
Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
||||
Path appDir = new Path(allAppsdir, appIDStr);
|
||||
delService.delete(userName, appDir, new Path[] {});
|
||||
|
||||
// Delete the nmPrivate app-dir
|
||||
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
|
||||
Path appSysDir = new Path(sysDir, appIDStr);
|
||||
delService.delete(null, appSysDir, new Path[] {});
|
||||
}
|
||||
|
||||
// TODO: decrement reference counts of all resources associated with this
|
||||
// app
|
||||
|
||||
dispatcher.getEventHandler().handle(new ApplicationEvent(
|
||||
application.getAppId(),
|
||||
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
|
||||
}
|
||||
|
||||
|
||||
LocalResourcesTracker getLocalResourcesTracker(
|
||||
LocalResourceVisibility visibility, String user, ApplicationId appId) {
|
||||
|
|
|
@ -22,8 +22,15 @@ import java.util.Map;
|
|||
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
|
||||
/**
|
||||
* Event that requests that the {@link ResourceLocalizationService} localize
|
||||
* a set of resources for the given container. This is generated by
|
||||
* {@link ContainerImpl} during container initialization.
|
||||
*/
|
||||
public class ContainerLocalizationRequestEvent extends
|
||||
ContainerLocalizationEvent {
|
||||
|
||||
|
|
|
@ -19,7 +19,11 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
|
||||
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
|
||||
/**
|
||||
* Events handled by {@link ResourceLocalizationService}
|
||||
*/
|
||||
public class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
|
||||
|
||||
public LocalizationEvent(LocalizationEventType event) {
|
||||
|
|
Loading…
Reference in New Issue