YARN-9084. Reset container state and defer readiness check for upgrade.
Contributed by Chandni Singh
(cherry picked from commit ccdd982e51
)
This commit is contained in:
parent
b4fa1830a8
commit
956e6cfee6
|
@ -68,12 +68,6 @@ public interface UpgradeComponentsFinder {
|
||||||
"not supported by upgrade");
|
"not supported by upgrade");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!Objects.equals(currentDef.getQuicklinks(),
|
|
||||||
targetDef.getQuicklinks())) {
|
|
||||||
throw new UnsupportedOperationException("changes to quick links " +
|
|
||||||
"not supported by upgrade");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!Objects.equals(currentDef.getLaunchTime(),
|
if (!Objects.equals(currentDef.getLaunchTime(),
|
||||||
targetDef.getLaunchTime())) {
|
targetDef.getLaunchTime())) {
|
||||||
throw new UnsupportedOperationException("changes to launch time " +
|
throw new UnsupportedOperationException("changes to launch time " +
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.service.component.Component;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
||||||
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy;
|
||||||
|
import org.apache.hadoop.yarn.service.monitor.probe.DefaultProbe;
|
||||||
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
|
||||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||||
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
|
||||||
|
@ -188,7 +189,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
@Override public void transition(ComponentInstance compInstance,
|
@Override public void transition(ComponentInstance compInstance,
|
||||||
ComponentInstanceEvent event) {
|
ComponentInstanceEvent event) {
|
||||||
// Query container status for ip and host
|
// Query container status for ip and host
|
||||||
compInstance.initializeStatusRetriever(event);
|
compInstance.initializeStatusRetriever(event, 0);
|
||||||
long containerStartTime = System.currentTimeMillis();
|
long containerStartTime = System.currentTimeMillis();
|
||||||
try {
|
try {
|
||||||
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
|
||||||
|
@ -268,7 +269,12 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
|
|
||||||
instance.upgradeInProgress.set(false);
|
instance.upgradeInProgress.set(false);
|
||||||
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
|
instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY);
|
||||||
instance.initializeStatusRetriever(event);
|
if (instance.component.getProbe() != null &&
|
||||||
|
instance.component.getProbe() instanceof DefaultProbe) {
|
||||||
|
instance.initializeStatusRetriever(event, 30);
|
||||||
|
} else {
|
||||||
|
instance.initializeStatusRetriever(event, 0);
|
||||||
|
}
|
||||||
|
|
||||||
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
|
Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ?
|
||||||
instance.component.getUpgradeStatus() :
|
instance.component.getUpgradeStatus() :
|
||||||
|
@ -627,7 +633,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
|
|
||||||
private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
|
private void reInitHelper(Component.UpgradeStatus upgradeStatus) {
|
||||||
cancelContainerStatusRetriever();
|
cancelContainerStatusRetriever();
|
||||||
setContainerStatus(null);
|
setContainerStatus(container.getId(), null);
|
||||||
scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
|
scheduler.executorService.submit(() -> cleanupRegistry(container.getId()));
|
||||||
scheduler.getContainerLaunchService()
|
scheduler.getContainerLaunchService()
|
||||||
.reInitCompInstance(scheduler.getApp(), this,
|
.reInitCompInstance(scheduler.getApp(), this,
|
||||||
|
@ -636,7 +642,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
upgradeStatus.getTargetVersion()));
|
upgradeStatus.getTargetVersion()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeStatusRetriever(ComponentInstanceEvent event) {
|
private void initializeStatusRetriever(ComponentInstanceEvent event,
|
||||||
|
long initialDelay) {
|
||||||
boolean cancelOnSuccess = true;
|
boolean cancelOnSuccess = true;
|
||||||
if (getCompSpec().getArtifact() != null &&
|
if (getCompSpec().getArtifact() != null &&
|
||||||
getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
|
getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
|
||||||
|
@ -646,10 +653,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
// container relaunch (see YARN-8265).
|
// container relaunch (see YARN-8265).
|
||||||
cancelOnSuccess = false;
|
cancelOnSuccess = false;
|
||||||
}
|
}
|
||||||
|
LOG.info("{} retrieve status after {}", compInstanceId, initialDelay);
|
||||||
containerStatusFuture =
|
containerStatusFuture =
|
||||||
scheduler.executorService.scheduleAtFixedRate(
|
scheduler.executorService.scheduleAtFixedRate(
|
||||||
new ContainerStatusRetriever(scheduler, event.getContainerId(),
|
new ContainerStatusRetriever(scheduler, event.getContainerId(),
|
||||||
this, cancelOnSuccess), 0, 1,
|
this, cancelOnSuccess), initialDelay, 1,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -745,32 +753,44 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setContainerStatus(ContainerStatus latestStatus) {
|
private void setContainerStatus(ContainerId containerId,
|
||||||
|
ContainerStatus latestStatus) {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
this.status = latestStatus;
|
this.status = latestStatus;
|
||||||
|
org.apache.hadoop.yarn.service.api.records.Container containerRec =
|
||||||
|
getCompSpec().getContainer(containerId.toString());
|
||||||
|
|
||||||
|
if (containerRec != null) {
|
||||||
|
if (latestStatus != null) {
|
||||||
|
containerRec.setIp(StringUtils.join(",", latestStatus.getIPs()));
|
||||||
|
containerRec.setHostname(latestStatus.getHost());
|
||||||
|
} else {
|
||||||
|
containerRec.setIp(null);
|
||||||
|
containerRec.setHostname(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateContainerStatus(ContainerStatus status) {
|
public void updateContainerStatus(ContainerStatus status) {
|
||||||
setContainerStatus(status);
|
org.apache.hadoop.yarn.service.api.records.Container containerRec =
|
||||||
org.apache.hadoop.yarn.service.api.records.Container container =
|
|
||||||
getCompSpec().getContainer(status.getContainerId().toString());
|
getCompSpec().getContainer(status.getContainerId().toString());
|
||||||
boolean doRegistryUpdate = true;
|
boolean doRegistryUpdate = true;
|
||||||
if (container != null) {
|
if (containerRec != null) {
|
||||||
String existingIP = container.getIp();
|
String existingIP = containerRec.getIp();
|
||||||
String newIP = StringUtils.join(",", status.getIPs());
|
String newIP = StringUtils.join(",", status.getIPs());
|
||||||
container.setIp(newIP);
|
|
||||||
container.setHostname(status.getHost());
|
|
||||||
if (existingIP != null && newIP.equals(existingIP)) {
|
if (existingIP != null && newIP.equals(existingIP)) {
|
||||||
doRegistryUpdate = false;
|
doRegistryUpdate = false;
|
||||||
}
|
}
|
||||||
if (timelineServiceEnabled && doRegistryUpdate) {
|
|
||||||
serviceTimelinePublisher.componentInstanceIPHostUpdated(container);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
setContainerStatus(status.getContainerId(), status);
|
||||||
|
if (containerRec != null && timelineServiceEnabled && doRegistryUpdate) {
|
||||||
|
serviceTimelinePublisher.componentInstanceIPHostUpdated(containerRec);
|
||||||
|
}
|
||||||
|
|
||||||
if (doRegistryUpdate) {
|
if (doRegistryUpdate) {
|
||||||
cleanupRegistry(status.getContainerId());
|
cleanupRegistry(status.getContainerId());
|
||||||
LOG.info(
|
LOG.info(
|
||||||
|
|
|
@ -363,8 +363,12 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
||||||
|
|
||||||
Multimap<String, String> containersAfterFailure = waitForAllCompToBeReady(
|
Multimap<String, String> containersAfterFailure = waitForAllCompToBeReady(
|
||||||
client, exampleApp);
|
client, exampleApp);
|
||||||
Assert.assertEquals("component container affected by restart",
|
containersBeforeFailure.keys().forEach(compName -> {
|
||||||
containersBeforeFailure, containersAfterFailure);
|
Assert.assertEquals("num containers after by restart for " + compName,
|
||||||
|
containersBeforeFailure.get(compName).size(),
|
||||||
|
containersAfterFailure.get(compName) == null ? 0 :
|
||||||
|
containersAfterFailure.get(compName).size());
|
||||||
|
});
|
||||||
|
|
||||||
LOG.info("Stop/destroy service {}", exampleApp);
|
LOG.info("Stop/destroy service {}", exampleApp);
|
||||||
client.actionStop(exampleApp.getName(), true);
|
client.actionStop(exampleApp.getName(), true);
|
||||||
|
|
Loading…
Reference in New Issue