From 4e1cef3625af1b88a3d1f0e422c06e3dc3b21105 Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Tue, 18 Dec 2018 18:02:03 -0500 Subject: [PATCH] YARN-9084. Reset container state and defer readiness check for upgrade. Contributed by Chandni Singh (cherry picked from commit ccdd982e51f4413bc2e98d03e0a05b1133042a31) --- .../yarn/service/UpgradeComponentsFinder.java | 6 --- .../component/instance/ComponentInstance.java | 50 +++++++++++++------ .../yarn/service/TestYarnNativeServices.java | 8 ++- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java index 96a34f4d3e0..7a88ccf46c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java @@ -68,12 +68,6 @@ public interface UpgradeComponentsFinder { "not supported by upgrade"); } - if (!Objects.equals(currentDef.getQuicklinks(), - targetDef.getQuicklinks())) { - throw new UnsupportedOperationException("changes to quick links " + - "not supported by upgrade"); - } - if (!Objects.equals(currentDef.getLaunchTime(), targetDef.getLaunchTime())) { throw new UnsupportedOperationException("changes to launch time " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 0145847d1bd..e9ffec5daf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; +import org.apache.hadoop.yarn.service.monitor.probe.DefaultProbe; import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; @@ -188,7 +189,7 @@ public class ComponentInstance implements EventHandler, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { // Query container status for ip and host - compInstance.initializeStatusRetriever(event); + compInstance.initializeStatusRetriever(event, 0); long containerStartTime = System.currentTimeMillis(); try { ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils @@ -268,7 +269,12 @@ public class ComponentInstance implements EventHandler, instance.upgradeInProgress.set(false); instance.setContainerState(ContainerState.RUNNING_BUT_UNREADY); - instance.initializeStatusRetriever(event); + if (instance.component.getProbe() != null && + instance.component.getProbe() instanceof DefaultProbe) { + instance.initializeStatusRetriever(event, 30); + } else { + instance.initializeStatusRetriever(event, 0); + } Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ? instance.component.getUpgradeStatus() : @@ -627,7 +633,7 @@ public class ComponentInstance implements EventHandler, private void reInitHelper(Component.UpgradeStatus upgradeStatus) { cancelContainerStatusRetriever(); - setContainerStatus(null); + setContainerStatus(container.getId(), null); scheduler.executorService.submit(() -> cleanupRegistry(container.getId())); scheduler.getContainerLaunchService() .reInitCompInstance(scheduler.getApp(), this, @@ -636,7 +642,8 @@ public class ComponentInstance implements EventHandler, upgradeStatus.getTargetVersion())); } - private void initializeStatusRetriever(ComponentInstanceEvent event) { + private void initializeStatusRetriever(ComponentInstanceEvent event, + long initialDelay) { boolean cancelOnSuccess = true; if (getCompSpec().getArtifact() != null && getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) { @@ -646,10 +653,11 @@ public class ComponentInstance implements EventHandler, // container relaunch (see YARN-8265). cancelOnSuccess = false; } + LOG.info("{} retrieve status after {}", compInstanceId, initialDelay); containerStatusFuture = scheduler.executorService.scheduleAtFixedRate( new ContainerStatusRetriever(scheduler, event.getContainerId(), - this, cancelOnSuccess), 0, 1, + this, cancelOnSuccess), initialDelay, 1, TimeUnit.SECONDS); } @@ -745,32 +753,44 @@ public class ComponentInstance implements EventHandler, } } - private void setContainerStatus(ContainerStatus latestStatus) { + private void setContainerStatus(ContainerId containerId, + ContainerStatus latestStatus) { try { writeLock.lock(); this.status = latestStatus; + org.apache.hadoop.yarn.service.api.records.Container containerRec = + getCompSpec().getContainer(containerId.toString()); + + if (containerRec != null) { + if (latestStatus != null) { + containerRec.setIp(StringUtils.join(",", latestStatus.getIPs())); + containerRec.setHostname(latestStatus.getHost()); + } else { + containerRec.setIp(null); + containerRec.setHostname(null); + } + } } finally { writeLock.unlock(); } } public void updateContainerStatus(ContainerStatus status) { - setContainerStatus(status); - org.apache.hadoop.yarn.service.api.records.Container container = + org.apache.hadoop.yarn.service.api.records.Container containerRec = getCompSpec().getContainer(status.getContainerId().toString()); boolean doRegistryUpdate = true; - if (container != null) { - String existingIP = container.getIp(); + if (containerRec != null) { + String existingIP = containerRec.getIp(); String newIP = StringUtils.join(",", status.getIPs()); - container.setIp(newIP); - container.setHostname(status.getHost()); if (existingIP != null && newIP.equals(existingIP)) { doRegistryUpdate = false; } - if (timelineServiceEnabled && doRegistryUpdate) { - serviceTimelinePublisher.componentInstanceIPHostUpdated(container); - } } + setContainerStatus(status.getContainerId(), status); + if (containerRec != null && timelineServiceEnabled && doRegistryUpdate) { + serviceTimelinePublisher.componentInstanceIPHostUpdated(containerRec); + } + if (doRegistryUpdate) { cleanupRegistry(status.getContainerId()); LOG.info( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 31a95b0c942..a22ada443c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -363,8 +363,12 @@ public class TestYarnNativeServices extends ServiceTestUtils { Multimap containersAfterFailure = waitForAllCompToBeReady( client, exampleApp); - Assert.assertEquals("component container affected by restart", - containersBeforeFailure, containersAfterFailure); + containersBeforeFailure.keys().forEach(compName -> { + Assert.assertEquals("num containers after by restart for " + compName, + containersBeforeFailure.get(compName).size(), + containersAfterFailure.get(compName) == null ? 0 : + containersAfterFailure.get(compName).size()); + }); LOG.info("Stop/destroy service {}", exampleApp); client.actionStop(exampleApp.getName(), true);