Merge r1529039 and r1529048 from trunk to branch-2 for YARN-1256. NM silently ignores non-existent service in StartContainerRequest (Xuan Gong via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1529049 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-10-04 01:04:26 +00:00
parent 75aeb6070a
commit 7b2cd4b411
7 changed files with 171 additions and 34 deletions

View File

@ -129,6 +129,9 @@ Release 2.1.2 - UNRELEASED
YARN-1236. FairScheduler setting queue name in RMApp is not working. YARN-1236. FairScheduler setting queue name in RMApp is not working.
(Sandy Ryza) (Sandy Ryza)
YARN-1256. NM silently ignores non-existent service in
StartContainerRequest (Xuan Gong via bikas)
YARN-1149. NM throws InvalidStateTransitonException: Invalid event: YARN-1149. NM throws InvalidStateTransitonException: Invalid event:
APPLICATION_LOG_HANDLING_FINISHED at RUNNING (Xuan Gong via hitesh) APPLICATION_LOG_HANDLING_FINISHED at RUNNING (Xuan Gong via hitesh)

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.exceptions;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
/**
* This exception is thrown by a NodeManager that is rejecting start-container
* requests via
* {@link ContainerManagementProtocol#startContainers(StartContainersRequest)}
* for auxservices does not exist.
*/
public class InvalidAuxServiceException extends YarnException {
private static final long serialVersionUID = 1L;
public InvalidAuxServiceException(String msg) {
super(msg);
}
}

View File

@ -30,8 +30,11 @@ public class AuxiliaryServiceHelper {
public static ByteBuffer getServiceDataFromEnv(String serviceName, public static ByteBuffer getServiceDataFromEnv(String serviceName,
Map<String, String> env) { Map<String, String> env) {
byte[] metaData = String meta = env.get(getPrefixServiceName(serviceName));
Base64.decodeBase64(env.get(getPrefixServiceName(serviceName))); if (null == meta) {
return null;
}
byte[] metaData = Base64.decodeBase64(meta);
return ByteBuffer.wrap(metaData); return ByteBuffer.wrap(metaData);
} }

View File

@ -175,39 +175,56 @@ public class AuxServices extends AbstractService
LOG.info("Got event " + event.getType() + " for appId " LOG.info("Got event " + event.getType() + " for appId "
+ event.getApplicationID()); + event.getApplicationID());
switch (event.getType()) { switch (event.getType()) {
case APPLICATION_INIT: case APPLICATION_INIT:
LOG.info("Got APPLICATION_INIT for service " + event.getServiceID()); LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
AuxiliaryService service = serviceMap.get(event.getServiceID()); AuxiliaryService service = null;
if (null == service) { try {
LOG.info("service is null"); service = serviceMap.get(event.getServiceID());
// TODO kill all containers waiting on Application service
return; .initializeApplication(new ApplicationInitializationContext(event
} .getUser(), event.getApplicationID(), event.getServiceData()));
service.initializeApplication(new ApplicationInitializationContext(event } catch (Throwable th) {
.getUser(), event.getApplicationID(), event.getServiceData())); logWarningWhenAuxServiceThrowExceptions(service,
break; AuxServicesEventType.APPLICATION_INIT, th);
case APPLICATION_STOP: }
for (AuxiliaryService serv : serviceMap.values()) { break;
serv.stopApplication(new ApplicationTerminationContext(event case APPLICATION_STOP:
.getApplicationID())); for (AuxiliaryService serv : serviceMap.values()) {
} try {
break; serv.stopApplication(new ApplicationTerminationContext(event
case CONTAINER_INIT: .getApplicationID()));
for (AuxiliaryService serv : serviceMap.values()) { } catch (Throwable th) {
serv.initializeContainer(new ContainerInitializationContext( logWarningWhenAuxServiceThrowExceptions(serv,
event.getUser(), event.getContainer().getContainerId(), AuxServicesEventType.APPLICATION_STOP, th);
event.getContainer().getResource())); }
} }
break; break;
case CONTAINER_STOP: case CONTAINER_INIT:
for (AuxiliaryService serv : serviceMap.values()) { for (AuxiliaryService serv : serviceMap.values()) {
serv.stopContainer(new ContainerTerminationContext( try {
event.getUser(), event.getContainer().getContainerId(), serv.initializeContainer(new ContainerInitializationContext(
event.getContainer().getResource())); event.getUser(), event.getContainer().getContainerId(),
} event.getContainer().getResource()));
break; } catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.CONTAINER_INIT, th);
}
}
break;
case CONTAINER_STOP:
for (AuxiliaryService serv : serviceMap.values()) {
try {
serv.stopContainer(new ContainerTerminationContext(
event.getUser(), event.getContainer().getContainerId(),
event.getContainer().getResource()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.CONTAINER_STOP, th);
}
}
break;
default: default:
throw new RuntimeException("Unknown type: " + event.getType()); throw new RuntimeException("Unknown type: " + event.getType());
} }
} }
@ -217,4 +234,11 @@ public class AuxServices extends AbstractService
} }
return p.matcher(name).matches(); return p.matcher(name).matches();
} }
private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service,
AuxServicesEventType eventType, Throwable th) {
LOG.warn((null == service ? "The auxService is null"
: "The auxService name is " + service.getName())
+ " and it got an error at event: " + eventType, th);
}
} }

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerException; import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -541,6 +542,18 @@ public class ContainerManagerImpl extends CompositeService implements
ContainerLaunchContext launchContext = request.getContainerLaunchContext(); ContainerLaunchContext launchContext = request.getContainerLaunchContext();
Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
if (launchContext.getServiceData()!=null &&
!launchContext.getServiceData().isEmpty()) {
for (Map.Entry<String, ByteBuffer> meta : launchContext.getServiceData()
.entrySet()) {
if (null == serviceData.get(meta.getKey())) {
throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
+ " does not exist");
}
}
}
Credentials credentials = parseCredentials(launchContext); Credentials credentials = parseCredentials(launchContext);
Container container = Container container =

View File

@ -178,6 +178,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
super.testMultipleContainersStopAndGetStatus(); super.testMultipleContainersStopAndGetStatus();
} }
@Override
public void testStartContainerFailureWithUnknownAuxService() throws Exception {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testContainerLaunchFromPreviousRM");
super.testStartContainerFailureWithUnknownAuxService();
}
private boolean shouldRunTest() { private boolean shouldRunTest() {
return System return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;

View File

@ -24,6 +24,7 @@ import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -37,6 +38,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidContainerException; import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -68,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@ -746,6 +750,48 @@ public class TestContainerManager extends BaseContainerManagerTest {
} }
} }
@Test
public void testStartContainerFailureWithUnknownAuxService() throws Exception {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[] { "existService" });
conf.setClass(
String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "existService"),
ServiceA.class, Service.class);
containerManager.start();
List<StartContainerRequest> startRequest =
new ArrayList<StartContainerRequest>();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
String serviceName = "non_exist_auxService";
serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes()));
containerLaunchContext.setServiceData(serviceData);
ContainerId cId = createContainerId(0);
String user = "start_container_fail";
Token containerToken =
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager());
StartContainerRequest request =
StartContainerRequest.newInstance(containerLaunchContext,
containerToken);
// start containers
startRequest.add(request);
StartContainersRequest requestList =
StartContainersRequest.newInstance(startRequest);
StartContainersResponse response =
containerManager.startContainers(requestList);
Assert.assertTrue(response.getFailedRequests().size() == 1);
Assert.assertTrue(response.getSuccessfullyStartedContainers().size() == 0);
Assert.assertTrue(response.getFailedRequests().containsKey(cId));
Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
.contains("The auxService:" + serviceName + " does not exist"));
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier, public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager) NMContainerTokenSecretManager containerTokenSecretManager)