YARN-2675. containersKilled metrics is not updated when the container is killed during localization. (Zhihai Xu via kasha)

(cherry picked from commit 954fb8581e)
This commit is contained in:
Karthik Kambatla 2014-12-19 16:00:24 -08:00
parent f9341c1e2c
commit b4e8ae591d
4 changed files with 159 additions and 51 deletions

View File

@ -218,6 +218,9 @@ Release 2.7.0 - UNRELEASED
YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie) YARN-2964. RM prematurely cancels tokens for jobs that submit jobs (oozie)
(Jian He via jlowe) (Jian He via jlowe)
YARN-2675. containersKilled metrics is not updated when the container is killed
during localization. (Zhihai Xu via kasha)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -159,9 +159,6 @@ public class ContainerImpl implements Container {
this.diagnostics.append(diagnostics); this.diagnostics.append(diagnostics);
} }
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition();
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
new ContainerDiagnosticsUpdateTransition(); new ContainerDiagnosticsUpdateTransition();
@ -202,7 +199,7 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.LOCALIZATION_FAILED, .addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.DONE, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION) new LocalizationFailedToDoneTransition())
.addTransition(ContainerState.LOCALIZATION_FAILED, .addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED, ContainerState.LOCALIZATION_FAILED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG, ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@ -254,7 +251,7 @@ public class ContainerImpl implements Container {
// From CONTAINER_EXITED_WITH_SUCCESS State // From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION) new ExitedWithSuccessToDoneTransition())
.addTransition(ContainerState.EXITED_WITH_SUCCESS, .addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG, ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@ -266,7 +263,7 @@ public class ContainerImpl implements Container {
// From EXITED_WITH_FAILURE State // From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE, .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION) new ExitedWithFailureToDoneTransition())
.addTransition(ContainerState.EXITED_WITH_FAILURE, .addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG, ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@ -301,7 +298,7 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.KILLING, .addTransition(ContainerState.KILLING,
ContainerState.DONE, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION) new KillingToDoneTransition())
// Handle a launched container during killing stage is a no-op // Handle a launched container during killing stage is a no-op
// as cleanup container is always handled after launch container event // as cleanup container is always handled after launch container event
// in the container launcher // in the container launcher
@ -313,7 +310,7 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.DONE, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION) new ContainerCleanedupAfterKillToDoneTransition())
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG, ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@ -463,47 +460,6 @@ public class ContainerImpl implements Container {
} }
} }
@SuppressWarnings("fallthrough")
private void finished() {
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
switch (getContainerState()) {
case EXITED_WITH_SUCCESS:
metrics.endRunningContainer();
metrics.completedContainer();
NMAuditLogger.logSuccess(user,
AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
applicationId, containerId);
break;
case EXITED_WITH_FAILURE:
if (wasLaunched) {
metrics.endRunningContainer();
}
// fall through
case LOCALIZATION_FAILED:
metrics.failedContainer();
NMAuditLogger.logFailure(user,
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
"Container failed with state: " + getContainerState(),
applicationId, containerId);
break;
case CONTAINER_CLEANEDUP_AFTER_KILL:
if (wasLaunched) {
metrics.endRunningContainer();
}
// fall through
case NEW:
metrics.killedContainer();
NMAuditLogger.logSuccess(user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
applicationId,
containerId);
}
metrics.releaseContainer(this.resource);
sendFinishedEvents();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void sendFinishedEvents() { private void sendFinishedEvents() {
// Inform the application // Inform the application
@ -611,7 +567,13 @@ public class ContainerImpl implements Container {
} else if (container.recoveredAsKilled && } else if (container.recoveredAsKilled &&
container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
// container was killed but never launched // container was killed but never launched
container.finished(); container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
container.metrics.releaseContainer(container.resource);
container.sendFinishedEvents();
return ContainerState.DONE; return ContainerState.DONE;
} }
@ -994,7 +956,8 @@ public class ContainerImpl implements Container {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void transition(ContainerImpl container, ContainerEvent event) { public void transition(ContainerImpl container, ContainerEvent event) {
container.finished(); container.metrics.releaseContainer(container.resource);
container.sendFinishedEvents();
//if the current state is NEW it means the CONTAINER_INIT was never //if the current state is NEW it means the CONTAINER_INIT was never
// sent for the event, thus no need to send the CONTAINER_STOP // sent for the event, thus no need to send the CONTAINER_STOP
if (container.getCurrentState() if (container.getCurrentState()
@ -1016,6 +979,105 @@ public class ContainerImpl implements Container {
container.exitCode = killEvent.getContainerExitStatus(); container.exitCode = killEvent.getContainerExitStatus();
container.addDiagnostics(killEvent.getDiagnostic(), "\n"); container.addDiagnostics(killEvent.getDiagnostic(), "\n");
container.addDiagnostics("Container is killed before being launched.\n"); container.addDiagnostics("Container is killed before being launched.\n");
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* - LOCALIZATION_FAILED -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class LocalizationFailedToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.failedContainer();
NMAuditLogger.logFailure(container.user,
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
"Container failed with state: " + container.getContainerState(),
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* - EXITED_WITH_SUCCESS -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ExitedWithSuccessToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.endRunningContainer();
container.metrics.completedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* - EXITED_WITH_FAILURE -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ExitedWithFailureToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
if (container.wasLaunched) {
container.metrics.endRunningContainer();
}
container.metrics.failedContainer();
NMAuditLogger.logFailure(container.user,
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
"Container failed with state: " + container.getContainerState(),
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* - KILLING -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class KillingToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event);
}
}
/**
* Handle the following transition:
* CONTAINER_CLEANEDUP_AFTER_KILL -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
*/
static class ContainerCleanedupAfterKillToDoneTransition extends
ContainerDoneTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
if (container.wasLaunched) {
container.metrics.endRunningContainer();
}
container.metrics.killedContainer();
NMAuditLogger.logSuccess(container.user,
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
container.containerId.getApplicationAttemptId().getApplicationId(),
container.containerId);
super.transition(container, event); super.transition(container, event);
} }
} }

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import com.google.common.annotations.VisibleForTesting;
@Metrics(about="Metrics for node manager", context="yarn") @Metrics(about="Metrics for node manager", context="yarn")
public class NodeManagerMetrics { public class NodeManagerMetrics {
@Metric MutableCounterInt containersLaunched; @Metric MutableCounterInt containersLaunched;
@ -127,4 +129,18 @@ public class NodeManagerMetrics {
return containersRunning.value(); return containersRunning.value();
} }
@VisibleForTesting
public int getKilledContainers() {
return containersKilled.value();
}
@VisibleForTesting
public int getFailedContainers() {
return containersFailed.value();
}
@VisibleForTesting
public int getCompletedContainers() {
return containersCompleted.value();
}
} }

View File

@ -173,13 +173,20 @@ public class TestContainer {
wc = new WrappedContainer(13, 314159265358979L, 4344, "yak"); wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
wc.initContainer(); wc.initContainer();
wc.localizeResources(); wc.localizeResources();
int running = metrics.getRunningContainers();
wc.launchContainer(); wc.launchContainer();
assertEquals(running + 1, metrics.getRunningContainers());
reset(wc.localizerBus); reset(wc.localizerBus);
wc.containerKilledOnRequest(); wc.containerKilledOnRequest();
assertEquals(ContainerState.EXITED_WITH_FAILURE, assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState()); wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources()); assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc); verifyCleanupCall(wc);
int failed = metrics.getFailedContainers();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(failed + 1, metrics.getFailedContainers());
assertEquals(running, metrics.getRunningContainers());
} }
finally { finally {
if (wc != null) { if (wc != null) {
@ -219,13 +226,20 @@ public class TestContainer {
wc = new WrappedContainer(11, 314159265358979L, 4344, "yak"); wc = new WrappedContainer(11, 314159265358979L, 4344, "yak");
wc.initContainer(); wc.initContainer();
wc.localizeResources(); wc.localizeResources();
int running = metrics.getRunningContainers();
wc.launchContainer(); wc.launchContainer();
assertEquals(running + 1, metrics.getRunningContainers());
reset(wc.localizerBus); reset(wc.localizerBus);
wc.containerSuccessful(); wc.containerSuccessful();
assertEquals(ContainerState.EXITED_WITH_SUCCESS, assertEquals(ContainerState.EXITED_WITH_SUCCESS,
wc.c.getContainerState()); wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources()); assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc); verifyCleanupCall(wc);
int completed = metrics.getCompletedContainers();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(completed + 1, metrics.getCompletedContainers());
assertEquals(running, metrics.getRunningContainers());
} }
finally { finally {
if (wc != null) { if (wc != null) {
@ -319,12 +333,14 @@ public class TestContainer {
try { try {
wc = new WrappedContainer(13, 314159265358979L, 4344, "yak"); wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
assertEquals(ContainerState.NEW, wc.c.getContainerState()); assertEquals(ContainerState.NEW, wc.c.getContainerState());
int killed = metrics.getKilledContainers();
wc.killContainer(); wc.killContainer();
assertEquals(ContainerState.DONE, wc.c.getContainerState()); assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
wc.c.cloneAndGetContainerStatus().getExitStatus()); wc.c.cloneAndGetContainerStatus().getExitStatus());
assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics() assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
.contains("KillRequest")); .contains("KillRequest"));
assertEquals(killed + 1, metrics.getKilledContainers());
} finally { } finally {
if (wc != null) { if (wc != null) {
wc.finished(); wc.finished();
@ -345,6 +361,10 @@ public class TestContainer {
wc.c.cloneAndGetContainerStatus().getExitStatus()); wc.c.cloneAndGetContainerStatus().getExitStatus());
assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics() assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics()
.contains("KillRequest")); .contains("KillRequest"));
int killed = metrics.getKilledContainers();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(killed + 1, metrics.getKilledContainers());
} finally { } finally {
if (wc != null) { if (wc != null) {
wc.finished(); wc.finished();
@ -365,6 +385,10 @@ public class TestContainer {
assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState()); assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources()); assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc); verifyCleanupCall(wc);
int failed = metrics.getFailedContainers();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(failed + 1, metrics.getFailedContainers());
} finally { } finally {
if (wc != null) { if (wc != null) {
wc.finished(); wc.finished();
@ -389,8 +413,11 @@ public class TestContainer {
wc.c.getContainerState()); wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources()); assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc); verifyCleanupCall(wc);
int killed = metrics.getKilledContainers();
wc.c.handle(new ContainerEvent(wc.c.getContainerId(), wc.c.handle(new ContainerEvent(wc.c.getContainerId(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(killed + 1, metrics.getKilledContainers());
assertEquals(0, metrics.getRunningContainers()); assertEquals(0, metrics.getRunningContainers());
} finally { } finally {
if (wc != null) { if (wc != null) {