diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 4aca0ea765a..a3236492135 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; @@ -151,10 +152,19 @@ public class ComponentInstance implements EventHandler, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { // Query container status for ip and host + boolean cancelOnSuccess = true; + if (compInstance.getCompSpec().getArtifact() != null && compInstance + .getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) { + // A docker container might get a different IP if the container is + // relaunched by the NM, so we need to keep checking the status. + // This is a temporary fix until the NM provides a callback for + // container relaunch (see YARN-8265). + cancelOnSuccess = false; + } compInstance.containerStatusFuture = compInstance.scheduler.executorService.scheduleAtFixedRate( new ContainerStatusRetriever(compInstance.scheduler, - event.getContainerId(), compInstance), 0, 1, + event.getContainerId(), compInstance, cancelOnSuccess), 0, 1, TimeUnit.SECONDS); long containerStartTime = System.currentTimeMillis(); try { @@ -373,14 +383,26 @@ public class ComponentInstance implements EventHandler, this.status = status; org.apache.hadoop.yarn.service.api.records.Container container = getCompSpec().getContainer(status.getContainerId().toString()); + boolean doRegistryUpdate = true; if (container != null) { - container.setIp(StringUtils.join(",", status.getIPs())); + String existingIP = container.getIp(); + String newIP = StringUtils.join(",", status.getIPs()); + container.setIp(newIP); container.setHostname(status.getHost()); - if (timelineServiceEnabled) { + if (existingIP != null && newIP.equals(existingIP)) { + doRegistryUpdate = false; + } + if (timelineServiceEnabled && doRegistryUpdate) { serviceTimelinePublisher.componentInstanceIPHostUpdated(container); } } - updateServiceRecord(yarnRegistryOperations, status); + if (doRegistryUpdate) { + cleanupRegistry(status.getContainerId()); + LOG.info( + getCompInstanceId() + " new IP = " + status.getIPs() + ", host = " + + status.getHost() + ", updating registry"); + updateServiceRecord(yarnRegistryOperations, status); + } } public String getCompName() { @@ -522,12 +544,15 @@ public class ComponentInstance implements EventHandler, private NodeId nodeId; private NMClient nmClient; private ComponentInstance instance; + private boolean cancelOnSuccess; ContainerStatusRetriever(ServiceScheduler scheduler, - ContainerId containerId, ComponentInstance instance) { + ContainerId containerId, ComponentInstance instance, boolean + cancelOnSuccess) { this.containerId = containerId; this.nodeId = instance.getNodeId(); this.nmClient = scheduler.getNmClient().getClient(); this.instance = instance; + this.cancelOnSuccess = cancelOnSuccess; } @Override public void run() { ContainerStatus status = null; @@ -548,10 +573,12 @@ public class ComponentInstance implements EventHandler, return; } instance.updateContainerStatus(status); - LOG.info( - instance.compInstanceId + " IP = " + status.getIPs() + ", host = " - + status.getHost() + ", cancel container status retriever"); - instance.containerStatusFuture.cancel(false); + if (cancelOnSuccess) { + LOG.info( + instance.compInstanceId + " IP = " + status.getIPs() + ", host = " + + status.getHost() + ", cancel container status retriever"); + instance.containerStatusFuture.cancel(false); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java index 04b03477b63..4a75aefe058 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java @@ -317,6 +317,14 @@ public class MockServiceAM extends ServiceMaster { } } + public Container updateContainerStatus(Service service, int id, + String compName, String host) { + ContainerId containerId = createContainerId(id); + Container container = createContainer(containerId, compName); + addContainerStatus(container, ContainerState.RUNNING, host); + return container; + } + public ContainerId createContainerId(int id) { ApplicationId applicationId = ApplicationId.fromString(service.getId()); return ContainerId.newContainerId( @@ -389,10 +397,15 @@ public class MockServiceAM extends ServiceMaster { } private void addContainerStatus(Container container, ContainerState state) { + addContainerStatus(container, state, container.getNodeId().getHost()); + } + + private void addContainerStatus(Container container, ContainerState state, + String host) { ContainerStatus status = ContainerStatus.newInstance(container.getId(), state, "", 0); - status.setHost(container.getNodeId().getHost()); - status.setIPs(Lists.newArrayList(container.getNodeId().getHost())); + status.setHost(host); + status.setIPs(Lists.newArrayList(host)); containerStatuses.put(container.getId(), status); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java index 260976aabdd..e9478f0b7e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier; +import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.api.records.Service; @@ -349,4 +350,45 @@ public class TestServiceAM extends ServiceTestUtils{ am.stop(); } + + @Test + public void testIPChange() throws TimeoutException, + InterruptedException { + ApplicationId applicationId = ApplicationId.newInstance(123456, 1); + String comp1Name = "comp1"; + String comp1InstName = "comp1-0"; + Service exampleApp = new Service(); + exampleApp.setId(applicationId.toString()); + exampleApp.setVersion("v1"); + exampleApp.setName("testIPChange"); + Component comp1 = createComponent(comp1Name, 1, "sleep 60"); + comp1.setArtifact(new Artifact().type(Artifact.TypeEnum.DOCKER)); + exampleApp.addComponent(comp1); + + MockServiceAM am = new MockServiceAM(exampleApp); + am.init(conf); + am.start(); + + ComponentInstance comp1inst0 = am.getCompInstance(comp1Name, comp1InstName); + // allocate a container + am.feedContainerToComp(exampleApp, 1, comp1Name); + GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus() != null, + 2000, 200000); + // first host status will match the container nodeId + Assert.assertEquals("localhost", + comp1inst0.getContainerStatus().getHost()); + + LOG.info("Change the IP and host"); + // change the container status + am.updateContainerStatus(exampleApp, 1, comp1Name, "new.host"); + GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus().getHost() + .equals("new.host"), 2000, 200000); + + LOG.info("Change the IP and host again"); + // change the container status + am.updateContainerStatus(exampleApp, 1, comp1Name, "newer.host"); + GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus().getHost() + .equals("newer.host"), 2000, 200000); + am.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/TestDockerClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/TestDockerClient.java new file mode 100644 index 00000000000..efd7db5f1cf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/TestDockerClient.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +/** Unit tests for DockerClient. */ +public class TestDockerClient { + private static final File TEST_ROOT_DIR = GenericTestUtils.getTestDir( + TestDockerClient.class.getName()); + + @Before + public void setup() { + TEST_ROOT_DIR.mkdirs(); + } + + @After + public void cleanup() { + FileUtil.fullyDelete(TEST_ROOT_DIR); + } + + @Test + public void testWriteCommandToTempFile() throws Exception { + String absRoot = TEST_ROOT_DIR.getAbsolutePath(); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + DockerCommand dockerCmd = new DockerInspectCommand(cid.toString()); + Configuration conf = new Configuration(); + conf.set("hadoop.tmp.dir", absRoot); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absRoot); + conf.set(YarnConfiguration.NM_LOG_DIRS, absRoot); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + Context mockContext = mock(Context.class); + doReturn(conf).when(mockContext).getConf(); + doReturn(dirsHandler).when(mockContext).getLocalDirsHandler(); + + DockerClient dockerClient = new DockerClient(conf); + dirsHandler.init(conf); + dirsHandler.start(); + String tmpPath = dockerClient.writeCommandToTempFile(dockerCmd, cid, + mockContext); + dirsHandler.stop(); + File tmpFile = new File(tmpPath); + assertTrue(tmpFile + " was not created", tmpFile.exists()); + } +}