YARN-1256. NM silently ignores non-existent service in StartContainerRequest (Xuan Gong via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529039 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ac2cdb5f65
commit
8ebf37f369
|
@ -144,6 +144,9 @@ Release 2.1.2 - UNRELEASED
|
||||||
YARN-1236. FairScheduler setting queue name in RMApp is not working.
|
YARN-1236. FairScheduler setting queue name in RMApp is not working.
|
||||||
(Sandy Ryza)
|
(Sandy Ryza)
|
||||||
|
|
||||||
|
YARN-1256. NM silently ignores non-existent service in
|
||||||
|
StartContainerRequest (Xuan Gong via bikas)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -30,8 +30,11 @@ public class AuxiliaryServiceHelper {
|
||||||
|
|
||||||
public static ByteBuffer getServiceDataFromEnv(String serviceName,
|
public static ByteBuffer getServiceDataFromEnv(String serviceName,
|
||||||
Map<String, String> env) {
|
Map<String, String> env) {
|
||||||
byte[] metaData =
|
String meta = env.get(getPrefixServiceName(serviceName));
|
||||||
Base64.decodeBase64(env.get(getPrefixServiceName(serviceName)));
|
if (null == meta) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
byte[] metaData = Base64.decodeBase64(meta);
|
||||||
return ByteBuffer.wrap(metaData);
|
return ByteBuffer.wrap(metaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -175,39 +175,56 @@ public class AuxServices extends AbstractService
|
||||||
LOG.info("Got event " + event.getType() + " for appId "
|
LOG.info("Got event " + event.getType() + " for appId "
|
||||||
+ event.getApplicationID());
|
+ event.getApplicationID());
|
||||||
switch (event.getType()) {
|
switch (event.getType()) {
|
||||||
case APPLICATION_INIT:
|
case APPLICATION_INIT:
|
||||||
LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
|
LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
|
||||||
AuxiliaryService service = serviceMap.get(event.getServiceID());
|
AuxiliaryService service = null;
|
||||||
if (null == service) {
|
try {
|
||||||
LOG.info("service is null");
|
service = serviceMap.get(event.getServiceID());
|
||||||
// TODO kill all containers waiting on Application
|
service
|
||||||
return;
|
.initializeApplication(new ApplicationInitializationContext(event
|
||||||
}
|
.getUser(), event.getApplicationID(), event.getServiceData()));
|
||||||
service.initializeApplication(new ApplicationInitializationContext(event
|
} catch (Throwable th) {
|
||||||
.getUser(), event.getApplicationID(), event.getServiceData()));
|
logWarningWhenAuxServiceThrowExceptions(service,
|
||||||
break;
|
AuxServicesEventType.APPLICATION_INIT, th);
|
||||||
case APPLICATION_STOP:
|
}
|
||||||
for (AuxiliaryService serv : serviceMap.values()) {
|
break;
|
||||||
serv.stopApplication(new ApplicationTerminationContext(event
|
case APPLICATION_STOP:
|
||||||
.getApplicationID()));
|
for (AuxiliaryService serv : serviceMap.values()) {
|
||||||
}
|
try {
|
||||||
break;
|
serv.stopApplication(new ApplicationTerminationContext(event
|
||||||
case CONTAINER_INIT:
|
.getApplicationID()));
|
||||||
for (AuxiliaryService serv : serviceMap.values()) {
|
} catch (Throwable th) {
|
||||||
serv.initializeContainer(new ContainerInitializationContext(
|
logWarningWhenAuxServiceThrowExceptions(serv,
|
||||||
event.getUser(), event.getContainer().getContainerId(),
|
AuxServicesEventType.APPLICATION_STOP, th);
|
||||||
event.getContainer().getResource()));
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CONTAINER_STOP:
|
case CONTAINER_INIT:
|
||||||
for (AuxiliaryService serv : serviceMap.values()) {
|
for (AuxiliaryService serv : serviceMap.values()) {
|
||||||
serv.stopContainer(new ContainerTerminationContext(
|
try {
|
||||||
event.getUser(), event.getContainer().getContainerId(),
|
serv.initializeContainer(new ContainerInitializationContext(
|
||||||
event.getContainer().getResource()));
|
event.getUser(), event.getContainer().getContainerId(),
|
||||||
}
|
event.getContainer().getResource()));
|
||||||
break;
|
} catch (Throwable th) {
|
||||||
|
logWarningWhenAuxServiceThrowExceptions(serv,
|
||||||
|
AuxServicesEventType.CONTAINER_INIT, th);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case CONTAINER_STOP:
|
||||||
|
for (AuxiliaryService serv : serviceMap.values()) {
|
||||||
|
try {
|
||||||
|
serv.stopContainer(new ContainerTerminationContext(
|
||||||
|
event.getUser(), event.getContainer().getContainerId(),
|
||||||
|
event.getContainer().getResource()));
|
||||||
|
} catch (Throwable th) {
|
||||||
|
logWarningWhenAuxServiceThrowExceptions(serv,
|
||||||
|
AuxServicesEventType.CONTAINER_STOP, th);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("Unknown type: " + event.getType());
|
throw new RuntimeException("Unknown type: " + event.getType());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,4 +234,11 @@ public class AuxServices extends AbstractService
|
||||||
}
|
}
|
||||||
return p.matcher(name).matches();
|
return p.matcher(name).matches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service,
|
||||||
|
AuxServicesEventType eventType, Throwable th) {
|
||||||
|
LOG.warn((null == service ? "The auxService is null"
|
||||||
|
: "The auxService name is " + service.getName())
|
||||||
|
+ " and it got an error at event: " + eventType, th);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
|
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
|
||||||
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -451,6 +452,18 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
|
|
||||||
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
||||||
|
|
||||||
|
Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
|
||||||
|
if (launchContext.getServiceData()!=null &&
|
||||||
|
!launchContext.getServiceData().isEmpty()) {
|
||||||
|
for (Map.Entry<String, ByteBuffer> meta : launchContext.getServiceData()
|
||||||
|
.entrySet()) {
|
||||||
|
if (null == serviceData.get(meta.getKey())) {
|
||||||
|
throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
|
||||||
|
+ " does not exist");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Credentials credentials = parseCredentials(launchContext);
|
Credentials credentials = parseCredentials(launchContext);
|
||||||
|
|
||||||
Container container =
|
Container container =
|
||||||
|
|
|
@ -178,6 +178,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
|
||||||
super.testMultipleContainersStopAndGetStatus();
|
super.testMultipleContainersStopAndGetStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void testStartContainerFailureWithUnknownAuxService() throws Exception {
|
||||||
|
// Don't run the test if the binary is not available.
|
||||||
|
if (!shouldRunTest()) {
|
||||||
|
LOG.info("LCE binary path is not passed. Not running the test");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Running testContainerLaunchFromPreviousRM");
|
||||||
|
super.testStartContainerFailureWithUnknownAuxService();
|
||||||
|
}
|
||||||
|
|
||||||
private boolean shouldRunTest() {
|
private boolean shouldRunTest() {
|
||||||
return System
|
return System
|
||||||
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
|
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.FileReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -37,6 +38,7 @@ import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||||
|
@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
|
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
|
@ -68,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
|
@ -746,6 +750,48 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStartContainerFailureWithUnknownAuxService() throws Exception {
|
||||||
|
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
|
||||||
|
new String[] { "existService" });
|
||||||
|
conf.setClass(
|
||||||
|
String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "existService"),
|
||||||
|
ServiceA.class, Service.class);
|
||||||
|
containerManager.start();
|
||||||
|
|
||||||
|
List<StartContainerRequest> startRequest =
|
||||||
|
new ArrayList<StartContainerRequest>();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
||||||
|
String serviceName = "non_exist_auxService";
|
||||||
|
serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes()));
|
||||||
|
containerLaunchContext.setServiceData(serviceData);
|
||||||
|
|
||||||
|
ContainerId cId = createContainerId(0);
|
||||||
|
String user = "start_container_fail";
|
||||||
|
Token containerToken =
|
||||||
|
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
|
||||||
|
user, context.getContainerTokenSecretManager());
|
||||||
|
StartContainerRequest request =
|
||||||
|
StartContainerRequest.newInstance(containerLaunchContext,
|
||||||
|
containerToken);
|
||||||
|
|
||||||
|
// start containers
|
||||||
|
startRequest.add(request);
|
||||||
|
StartContainersRequest requestList =
|
||||||
|
StartContainersRequest.newInstance(startRequest);
|
||||||
|
|
||||||
|
StartContainersResponse response =
|
||||||
|
containerManager.startContainers(requestList);
|
||||||
|
Assert.assertTrue(response.getFailedRequests().size() == 1);
|
||||||
|
Assert.assertTrue(response.getSuccessfullyStartedContainers().size() == 0);
|
||||||
|
Assert.assertTrue(response.getFailedRequests().containsKey(cId));
|
||||||
|
Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
|
||||||
|
.contains("The auxService:" + serviceName + " does not exist"));
|
||||||
|
}
|
||||||
|
|
||||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||||
NodeId nodeId, String user,
|
NodeId nodeId, String user,
|
||||||
NMContainerTokenSecretManager containerTokenSecretManager)
|
NMContainerTokenSecretManager containerTokenSecretManager)
|
||||||
|
|
Loading…
Reference in New Issue