YARN-8265. Improve DNS handling on docker IP changes.
Contributed by Billie Rinaldi
This commit is contained in:
parent
6c8e51ca7e
commit
0ff94563b9
|
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.service.ServiceScheduler;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.component.Component;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||
|
@ -151,10 +152,19 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
@Override public void transition(ComponentInstance compInstance,
|
||||
ComponentInstanceEvent event) {
|
||||
// Query container status for ip and host
|
||||
boolean cancelOnSuccess = true;
|
||||
if (compInstance.getCompSpec().getArtifact() != null && compInstance
|
||||
.getCompSpec().getArtifact().getType() == Artifact.TypeEnum.DOCKER) {
|
||||
// A docker container might get a different IP if the container is
|
||||
// relaunched by the NM, so we need to keep checking the status.
|
||||
// This is a temporary fix until the NM provides a callback for
|
||||
// container relaunch (see YARN-8265).
|
||||
cancelOnSuccess = false;
|
||||
}
|
||||
compInstance.containerStatusFuture =
|
||||
compInstance.scheduler.executorService.scheduleAtFixedRate(
|
||||
new ContainerStatusRetriever(compInstance.scheduler,
|
||||
event.getContainerId(), compInstance), 0, 1,
|
||||
event.getContainerId(), compInstance, cancelOnSuccess), 0, 1,
|
||||
TimeUnit.SECONDS);
|
||||
long containerStartTime = System.currentTimeMillis();
|
||||
try {
|
||||
|
@ -373,15 +383,27 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
this.status = status;
|
||||
org.apache.hadoop.yarn.service.api.records.Container container =
|
||||
getCompSpec().getContainer(status.getContainerId().toString());
|
||||
boolean doRegistryUpdate = true;
|
||||
if (container != null) {
|
||||
container.setIp(StringUtils.join(",", status.getIPs()));
|
||||
String existingIP = container.getIp();
|
||||
String newIP = StringUtils.join(",", status.getIPs());
|
||||
container.setIp(newIP);
|
||||
container.setHostname(status.getHost());
|
||||
if (timelineServiceEnabled) {
|
||||
if (existingIP != null && newIP.equals(existingIP)) {
|
||||
doRegistryUpdate = false;
|
||||
}
|
||||
if (timelineServiceEnabled && doRegistryUpdate) {
|
||||
serviceTimelinePublisher.componentInstanceIPHostUpdated(container);
|
||||
}
|
||||
}
|
||||
if (doRegistryUpdate) {
|
||||
cleanupRegistry(status.getContainerId());
|
||||
LOG.info(
|
||||
getCompInstanceId() + " new IP = " + status.getIPs() + ", host = "
|
||||
+ status.getHost() + ", updating registry");
|
||||
updateServiceRecord(yarnRegistryOperations, status);
|
||||
}
|
||||
}
|
||||
|
||||
public String getCompName() {
|
||||
return compInstanceId.getCompName();
|
||||
|
@ -522,12 +544,15 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
private NodeId nodeId;
|
||||
private NMClient nmClient;
|
||||
private ComponentInstance instance;
|
||||
private boolean cancelOnSuccess;
|
||||
ContainerStatusRetriever(ServiceScheduler scheduler,
|
||||
ContainerId containerId, ComponentInstance instance) {
|
||||
ContainerId containerId, ComponentInstance instance, boolean
|
||||
cancelOnSuccess) {
|
||||
this.containerId = containerId;
|
||||
this.nodeId = instance.getNodeId();
|
||||
this.nmClient = scheduler.getNmClient().getClient();
|
||||
this.instance = instance;
|
||||
this.cancelOnSuccess = cancelOnSuccess;
|
||||
}
|
||||
@Override public void run() {
|
||||
ContainerStatus status = null;
|
||||
|
@ -548,12 +573,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
return;
|
||||
}
|
||||
instance.updateContainerStatus(status);
|
||||
if (cancelOnSuccess) {
|
||||
LOG.info(
|
||||
instance.compInstanceId + " IP = " + status.getIPs() + ", host = "
|
||||
+ status.getHost() + ", cancel container status retriever");
|
||||
instance.containerStatusFuture.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelContainerStatusRetriever() {
|
||||
if (containerStatusFuture != null && !containerStatusFuture.isDone()) {
|
||||
|
|
|
@ -317,6 +317,14 @@ public class MockServiceAM extends ServiceMaster {
|
|||
}
|
||||
}
|
||||
|
||||
public Container updateContainerStatus(Service service, int id,
|
||||
String compName, String host) {
|
||||
ContainerId containerId = createContainerId(id);
|
||||
Container container = createContainer(containerId, compName);
|
||||
addContainerStatus(container, ContainerState.RUNNING, host);
|
||||
return container;
|
||||
}
|
||||
|
||||
public ContainerId createContainerId(int id) {
|
||||
ApplicationId applicationId = ApplicationId.fromString(service.getId());
|
||||
return ContainerId.newContainerId(
|
||||
|
@ -389,10 +397,15 @@ public class MockServiceAM extends ServiceMaster {
|
|||
}
|
||||
|
||||
private void addContainerStatus(Container container, ContainerState state) {
|
||||
addContainerStatus(container, state, container.getNodeId().getHost());
|
||||
}
|
||||
|
||||
private void addContainerStatus(Container container, ContainerState state,
|
||||
String host) {
|
||||
ContainerStatus status = ContainerStatus.newInstance(container.getId(),
|
||||
state, "", 0);
|
||||
status.setHost(container.getNodeId().getHost());
|
||||
status.setIPs(Lists.newArrayList(container.getNodeId().getHost()));
|
||||
status.setHost(host);
|
||||
status.setIPs(Lists.newArrayList(host));
|
||||
containerStatuses.put(container.getId(), status);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
|
|||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.DockerCredentialTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
|
@ -349,4 +350,45 @@ public class TestServiceAM extends ServiceTestUtils{
|
|||
|
||||
am.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIPChange() throws TimeoutException,
|
||||
InterruptedException {
|
||||
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
|
||||
String comp1Name = "comp1";
|
||||
String comp1InstName = "comp1-0";
|
||||
Service exampleApp = new Service();
|
||||
exampleApp.setId(applicationId.toString());
|
||||
exampleApp.setVersion("v1");
|
||||
exampleApp.setName("testIPChange");
|
||||
Component comp1 = createComponent(comp1Name, 1, "sleep 60");
|
||||
comp1.setArtifact(new Artifact().type(Artifact.TypeEnum.DOCKER));
|
||||
exampleApp.addComponent(comp1);
|
||||
|
||||
MockServiceAM am = new MockServiceAM(exampleApp);
|
||||
am.init(conf);
|
||||
am.start();
|
||||
|
||||
ComponentInstance comp1inst0 = am.getCompInstance(comp1Name, comp1InstName);
|
||||
// allocate a container
|
||||
am.feedContainerToComp(exampleApp, 1, comp1Name);
|
||||
GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus() != null,
|
||||
2000, 200000);
|
||||
// first host status will match the container nodeId
|
||||
Assert.assertEquals("localhost",
|
||||
comp1inst0.getContainerStatus().getHost());
|
||||
|
||||
LOG.info("Change the IP and host");
|
||||
// change the container status
|
||||
am.updateContainerStatus(exampleApp, 1, comp1Name, "new.host");
|
||||
GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus().getHost()
|
||||
.equals("new.host"), 2000, 200000);
|
||||
|
||||
LOG.info("Change the IP and host again");
|
||||
// change the container status
|
||||
am.updateContainerStatus(exampleApp, 1, comp1Name, "newer.host");
|
||||
GenericTestUtils.waitFor(() -> comp1inst0.getContainerStatus().getHost()
|
||||
.equals("newer.host"), 2000, 200000);
|
||||
am.stop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/** Unit tests for DockerClient. */
|
||||
public class TestDockerClient {
|
||||
private static final File TEST_ROOT_DIR = GenericTestUtils.getTestDir(
|
||||
TestDockerClient.class.getName());
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
TEST_ROOT_DIR.mkdirs();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
FileUtil.fullyDelete(TEST_ROOT_DIR);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteCommandToTempFile() throws Exception {
|
||||
String absRoot = TEST_ROOT_DIR.getAbsolutePath();
|
||||
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
||||
DockerCommand dockerCmd = new DockerInspectCommand(cid.toString());
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("hadoop.tmp.dir", absRoot);
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absRoot);
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, absRoot);
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
Context mockContext = mock(Context.class);
|
||||
doReturn(conf).when(mockContext).getConf();
|
||||
doReturn(dirsHandler).when(mockContext).getLocalDirsHandler();
|
||||
|
||||
DockerClient dockerClient = new DockerClient(conf);
|
||||
dirsHandler.init(conf);
|
||||
dirsHandler.start();
|
||||
String tmpPath = dockerClient.writeCommandToTempFile(dockerCmd, cid,
|
||||
mockContext);
|
||||
dirsHandler.stop();
|
||||
File tmpFile = new File(tmpPath);
|
||||
assertTrue(tmpFile + " was not created", tmpFile.exists());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue