YARN-9071. Improved status update for reinitialized containers.
Contributed by Chandni Singh
(cherry picked from commit 1b790f4dd1
)
This commit is contained in:
parent
293c992e81
commit
7ef4ff1905
|
@ -154,10 +154,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
REINITIALIZED), START, new StartedAfterUpgradeTransition())
|
REINITIALIZED), START, new StartedAfterUpgradeTransition())
|
||||||
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
|
.addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT),
|
||||||
STOP, new StoppedAfterCancelUpgradeTransition())
|
STOP, new StoppedAfterCancelUpgradeTransition())
|
||||||
|
|
||||||
|
// FROM REINITIALIZED
|
||||||
.addTransition(REINITIALIZED, CANCEL_UPGRADING, CANCEL_UPGRADE,
|
.addTransition(REINITIALIZED, CANCEL_UPGRADING, CANCEL_UPGRADE,
|
||||||
new CancelledAfterReinitTransition())
|
new CancelledAfterReinitTransition())
|
||||||
.addTransition(REINITIALIZED, READY, BECOME_READY,
|
.addTransition(REINITIALIZED, READY, BECOME_READY,
|
||||||
new ContainerBecomeReadyTransition(true))
|
new ContainerBecomeReadyTransition(true))
|
||||||
|
.addTransition(REINITIALIZED, REINITIALIZED, STOP,
|
||||||
|
new StoppedAfterUpgradeTransition())
|
||||||
.installTopology();
|
.installTopology();
|
||||||
|
|
||||||
public ComponentInstance(Component component,
|
public ComponentInstance(Component component,
|
||||||
|
@ -184,20 +188,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
@Override public void transition(ComponentInstance compInstance,
|
@Override public void transition(ComponentInstance compInstance,
|
||||||
ComponentInstanceEvent event) {
|
ComponentInstanceEvent event) {
|
||||||
// Query container status for ip and host
|
// Query container status for ip and host
|
||||||
boolean cancelOnSuccess = true;
|
compInstance.initializeStatusRetriever(event);
|
||||||
if (compInstance.getCompSpec().getArtifact() != null && compInstance
|
|
||||||
.getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
|
|
||||||
// A docker container might get a different IP if the container is
|
|
||||||
// relaunched by the NM, so we need to keep checking the status.
|
|
||||||
// This is a temporary fix until the NM provides a callback for
|
|
||||||
// container relaunch (see YARN-8265).
|
|
||||||
cancelOnSuccess = false;
|
|
||||||
}
|
|
||||||
compInstance.containerStatusFuture =
|
|
||||||
compInstance.scheduler.executorService.scheduleAtFixedRate(
|
|
||||||
new ContainerStatusRetriever(compInstance.scheduler,
|
|
||||||
event.getContainerId(), compInstance, cancelOnSuccess), 0, 1,
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
long containerStartTime = System.currentTimeMillis();
|
long containerStartTime = System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
||||||
|
@ -277,6 +268,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
|
|
||||||
instance.upgradeInProgress.set(false);
|
instance.upgradeInProgress.set(false);
|
||||||
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
|
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
|
||||||
|
instance.initializeStatusRetriever(event);
|
||||||
|
|
||||||
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
|
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
|
||||||
instance.component.getUpgradeStatus() :
|
instance.component.getUpgradeStatus() :
|
||||||
|
@ -572,13 +564,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
instance.setContainerState(ContainerState.UPGRADING);
|
instance.setContainerState(ContainerState.UPGRADING);
|
||||||
instance.component.decContainersReady(false);
|
instance.component.decContainersReady(false);
|
||||||
|
|
||||||
Component.UpgradeStatus status = instance.component.getUpgradeStatus();
|
Component.UpgradeStatus upgradeStatus = instance.component.
|
||||||
instance.scheduler.getContainerLaunchService()
|
getUpgradeStatus();
|
||||||
.reInitCompInstance(instance.scheduler.getApp(), instance,
|
instance.reInitHelper(upgradeStatus);
|
||||||
instance.container,
|
|
||||||
instance.component.createLaunchContext(
|
|
||||||
status.getTargetSpec(),
|
|
||||||
status.getTargetVersion()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -634,11 +622,35 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
LOG.info("{} cancelling upgrade", container.getId());
|
LOG.info("{} cancelling upgrade", container.getId());
|
||||||
setContainerState(ContainerState.UPGRADING);
|
setContainerState(ContainerState.UPGRADING);
|
||||||
Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus();
|
Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus();
|
||||||
|
reInitHelper(cancelStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
|
||||||
|
cancelContainerStatusRetriever();
|
||||||
|
setContainerStatus(null);
|
||||||
|
scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
|
||||||
scheduler.getContainerLaunchService()
|
scheduler.getContainerLaunchService()
|
||||||
.reInitCompInstance(scheduler.getApp(), this,
|
.reInitCompInstance(scheduler.getApp(), this,
|
||||||
this.container, this.component.createLaunchContext(
|
this.container, this.component.createLaunchContext(
|
||||||
cancelStatus.getTargetSpec(),
|
upgradeStatus.getTargetSpec(),
|
||||||
cancelStatus.getTargetVersion()));
|
upgradeStatus.getTargetVersion()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeStatusRetriever(ComponentInstanceEvent event) {
|
||||||
|
boolean cancelOnSuccess = true;
|
||||||
|
if (getCompSpec().getArtifact() != null &&
|
||||||
|
getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
|
||||||
|
// A docker container might get a different IP if the container is
|
||||||
|
// relaunched by the NM, so we need to keep checking the status.
|
||||||
|
// This is a temporary fix until the NM provides a callback for
|
||||||
|
// container relaunch (see YARN-8265).
|
||||||
|
cancelOnSuccess = false;
|
||||||
|
}
|
||||||
|
containerStatusFuture =
|
||||||
|
scheduler.executorService.scheduleAtFixedRate(
|
||||||
|
new ContainerStatusRetriever(scheduler, event.getContainerId(),
|
||||||
|
this, cancelOnSuccess), 0, 1,
|
||||||
|
TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ComponentInstanceState getState() {
|
public ComponentInstanceState getState() {
|
||||||
|
@ -725,11 +737,25 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainerStatus getContainerStatus() {
|
public ContainerStatus getContainerStatus() {
|
||||||
return status;
|
try {
|
||||||
|
readLock.lock();
|
||||||
|
return status;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setContainerStatus(ContainerStatus latestStatus) {
|
||||||
|
try {
|
||||||
|
writeLock.lock();
|
||||||
|
this.status = latestStatus;
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateContainerStatus(ContainerStatus status) {
|
public void updateContainerStatus(ContainerStatus status) {
|
||||||
this.status = status;
|
setContainerStatus(status);
|
||||||
org.apache.hadoop.yarn.service.api.records.Container container =
|
org.apache.hadoop.yarn.service.api.records.Container container =
|
||||||
getCompSpec().getContainer(status.getContainerId().toString());
|
getCompSpec().getContainer(status.getContainerId().toString());
|
||||||
boolean doRegistryUpdate = true;
|
boolean doRegistryUpdate = true;
|
||||||
|
|
|
@ -140,6 +140,42 @@ public class TestComponentInstance {
|
||||||
.getId().toString()).getState());
|
.getId().toString()).getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailureAfterReinit() throws Exception {
|
||||||
|
ServiceContext context = TestComponent.createTestContext(rule,
|
||||||
|
"testContainerUpgradeFailed");
|
||||||
|
Component component = context.scheduler.getAllComponents().entrySet()
|
||||||
|
.iterator().next().getValue();
|
||||||
|
upgradeComponent(component);
|
||||||
|
|
||||||
|
ComponentInstance instance = component.getAllComponentInstances().iterator()
|
||||||
|
.next();
|
||||||
|
|
||||||
|
ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent(
|
||||||
|
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
|
||||||
|
instance.handle(upgradeEvent);
|
||||||
|
|
||||||
|
// NM finished updgrae
|
||||||
|
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
|
||||||
|
ComponentInstanceEventType.START));
|
||||||
|
Assert.assertEquals("instance not running",
|
||||||
|
ContainerState.RUNNING_BUT_UNREADY,
|
||||||
|
component.getComponentSpec().getContainer(instance.getContainer()
|
||||||
|
.getId().toString()).getState());
|
||||||
|
|
||||||
|
ContainerStatus containerStatus = mock(ContainerStatus.class);
|
||||||
|
when(containerStatus.getExitStatus()).thenReturn(
|
||||||
|
ContainerExitStatus.ABORTED);
|
||||||
|
ComponentInstanceEvent stopEvent = new ComponentInstanceEvent(
|
||||||
|
instance.getContainer().getId(), ComponentInstanceEventType.STOP)
|
||||||
|
.setStatus(containerStatus);
|
||||||
|
// this is the call back from NM for the upgrade
|
||||||
|
instance.handle(stopEvent);
|
||||||
|
Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE,
|
||||||
|
component.getComponentSpec().getContainer(instance.getContainer()
|
||||||
|
.getId().toString()).getState());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCancelNothingToUpgrade() throws Exception {
|
public void testCancelNothingToUpgrade() throws Exception {
|
||||||
ServiceContext context = TestComponent.createTestContext(rule,
|
ServiceContext context = TestComponent.createTestContext(rule,
|
||||||
|
|
|
@ -929,10 +929,21 @@ public class ContainerImpl implements Container {
|
||||||
this.workDir = workDir;
|
this.workDir = workDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void clearIpAndHost() {
|
||||||
|
LOG.info("{} clearing ip and host", containerId);
|
||||||
|
this.ips = null;
|
||||||
|
this.host = null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setIpAndHost(String[] ipAndHost) {
|
public void setIpAndHost(String[] ipAndHost) {
|
||||||
this.ips = ipAndHost[0];
|
try {
|
||||||
this.host = ipAndHost[1];
|
this.writeLock.lock();
|
||||||
|
this.ips = ipAndHost[0];
|
||||||
|
this.host = ipAndHost[1];
|
||||||
|
} finally {
|
||||||
|
this.writeLock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1722,7 +1733,11 @@ public class ContainerImpl implements Container {
|
||||||
+ "] for re-initialization !!");
|
+ "] for re-initialization !!");
|
||||||
container.wasLaunched = false;
|
container.wasLaunched = false;
|
||||||
container.metrics.endRunningContainer();
|
container.metrics.endRunningContainer();
|
||||||
|
container.clearIpAndHost();
|
||||||
|
// Remove the container from the resource-monitor. When container
|
||||||
|
// is launched again, it is added back to monitoring service.
|
||||||
|
container.dispatcher.getEventHandler().handle(
|
||||||
|
new ContainerStopMonitoringEvent(container.containerId, true));
|
||||||
container.launchContext = container.reInitContext.newLaunchContext;
|
container.launchContext = container.reInitContext.newLaunchContext;
|
||||||
|
|
||||||
// Re configure the Retry Context
|
// Re configure the Retry Context
|
||||||
|
@ -1886,7 +1901,7 @@ public class ContainerImpl implements Container {
|
||||||
if (container.containerMetrics != null) {
|
if (container.containerMetrics != null) {
|
||||||
container.containerMetrics
|
container.containerMetrics
|
||||||
.recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
|
.recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
|
||||||
container.containerMetrics.finished();
|
container.containerMetrics.finished(false);
|
||||||
}
|
}
|
||||||
container.sendFinishedEvents();
|
container.sendFinishedEvents();
|
||||||
|
|
||||||
|
|
|
@ -242,14 +242,18 @@ public class ContainerMetrics implements MetricsSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void finished() {
|
public synchronized void finished(boolean unregisterWithoutDelay) {
|
||||||
if (!finished) {
|
if (!finished) {
|
||||||
this.finished = true;
|
this.finished = true;
|
||||||
if (timer != null) {
|
if (timer != null) {
|
||||||
timer.cancel();
|
timer.cancel();
|
||||||
timer = null;
|
timer = null;
|
||||||
}
|
}
|
||||||
scheduleTimerTaskForUnregistration();
|
if (!unregisterWithoutDelay) {
|
||||||
|
scheduleTimerTaskForUnregistration();
|
||||||
|
} else {
|
||||||
|
ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this);
|
||||||
|
}
|
||||||
this.pMemMBQuantiles.stop();
|
this.pMemMBQuantiles.stop();
|
||||||
this.cpuCoreUsagePercentQuantiles.stop();
|
this.cpuCoreUsagePercentQuantiles.stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,8 +22,20 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
|
||||||
public class ContainerStopMonitoringEvent extends ContainersMonitorEvent {
|
public class ContainerStopMonitoringEvent extends ContainersMonitorEvent {
|
||||||
|
|
||||||
|
private final boolean forReInit;
|
||||||
|
|
||||||
public ContainerStopMonitoringEvent(ContainerId containerId) {
|
public ContainerStopMonitoringEvent(ContainerId containerId) {
|
||||||
super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
|
super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
|
||||||
|
forReInit = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ContainerStopMonitoringEvent(ContainerId containerId,
|
||||||
|
boolean forReInit) {
|
||||||
|
super(containerId, ContainersMonitorEventType.STOP_MONITORING_CONTAINER);
|
||||||
|
this.forReInit = forReInit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isForReInit() {
|
||||||
|
return forReInit;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -487,7 +487,7 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Log the exception and proceed to the next container.
|
// Log the exception and proceed to the next container.
|
||||||
LOG.warn("Uncaught exception in ContainersMonitorImpl "
|
LOG.warn("Uncaught exception in ContainersMonitorImpl "
|
||||||
+ "while monitoring resource of " + containerId, e);
|
+ "while monitoring resource of {}", containerId, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -805,10 +805,12 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||||
vmemLimitMBs, pmemLimitMBs, cpuVcores);
|
vmemLimitMBs, pmemLimitMBs, cpuVcores);
|
||||||
break;
|
break;
|
||||||
case STOP_MONITORING_CONTAINER:
|
case STOP_MONITORING_CONTAINER:
|
||||||
|
ContainerStopMonitoringEvent stopEvent =
|
||||||
|
(ContainerStopMonitoringEvent) monitoringEvent;
|
||||||
usageMetrics = ContainerMetrics.getContainerMetrics(
|
usageMetrics = ContainerMetrics.getContainerMetrics(
|
||||||
containerId);
|
containerId);
|
||||||
if (usageMetrics != null) {
|
if (usageMetrics != null) {
|
||||||
usageMetrics.finished();
|
usageMetrics.finished(stopEvent.isForReInit());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CHANGE_MONITORING_CONTAINER_RESOURCE:
|
case CHANGE_MONITORING_CONTAINER_RESOURCE:
|
||||||
|
|
|
@ -63,7 +63,7 @@ public class TestContainerMetrics {
|
||||||
assertEquals(ERR, 1, collector.getRecords().size());
|
assertEquals(ERR, 1, collector.getRecords().size());
|
||||||
collector.clear();
|
collector.clear();
|
||||||
|
|
||||||
metrics.finished();
|
metrics.finished(false);
|
||||||
metrics.getMetrics(collector, true);
|
metrics.getMetrics(collector, true);
|
||||||
assertEquals(ERR, 1, collector.getRecords().size());
|
assertEquals(ERR, 1, collector.getRecords().size());
|
||||||
collector.clear();
|
collector.clear();
|
||||||
|
@ -137,8 +137,8 @@ public class TestContainerMetrics {
|
||||||
ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
|
ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
|
||||||
ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
|
ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
|
||||||
containerId3, 1, 0);
|
containerId3, 1, 0);
|
||||||
metrics1.finished();
|
metrics1.finished(false);
|
||||||
metrics2.finished();
|
metrics2.finished(false);
|
||||||
system.sampleMetrics();
|
system.sampleMetrics();
|
||||||
system.sampleMetrics();
|
system.sampleMetrics();
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
|
|
Loading…
Reference in New Issue