MAPREDUCE-3055. Simplified ApplicationAttemptId passing to ApplicationMaster via environment variable. (vinodkv)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1174785 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
272639877c
commit
b549c10782
|
@ -310,6 +310,9 @@ Release 0.23.0 - Unreleased
|
||||||
MAPREDUCE-2726. Added job-file to the AM and JobHistoryServer web
|
MAPREDUCE-2726. Added job-file to the AM and JobHistoryServer web
|
||||||
interfaces. (Jeffrey Naisbitt via vinodkv)
|
interfaces. (Jeffrey Naisbitt via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-3055. Simplified ApplicationAttemptId passing to
|
||||||
|
ApplicationMaster via environment variable. (vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
|
||||||
|
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.SystemClock;
|
import org.apache.hadoop.yarn.SystemClock;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.service.CompositeService;
|
import org.apache.hadoop.yarn.service.CompositeService;
|
||||||
import org.apache.hadoop.yarn.service.Service;
|
import org.apache.hadoop.yarn.service.Service;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Map-Reduce Application Master.
|
* The Map-Reduce Application Master.
|
||||||
|
@ -647,13 +649,18 @@ public class MRAppMaster extends CompositeService {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
try {
|
try {
|
||||||
//Configuration.addDefaultResource("job.xml");
|
String applicationAttemptIdStr = System
|
||||||
ApplicationId applicationId = RecordFactoryProvider
|
.getenv(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV);
|
||||||
.getRecordFactory(null).newRecordInstance(ApplicationId.class);
|
if (applicationAttemptIdStr == null) {
|
||||||
applicationId.setClusterTimestamp(Long.valueOf(args[0]));
|
String msg = ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV
|
||||||
applicationId.setId(Integer.valueOf(args[1]));
|
+ " is null";
|
||||||
int failCount = Integer.valueOf(args[2]);
|
LOG.error(msg);
|
||||||
MRAppMaster appMaster = new MRAppMaster(applicationId, failCount);
|
throw new IOException(msg);
|
||||||
|
}
|
||||||
|
ApplicationAttemptId applicationAttemptId = ConverterUtils
|
||||||
|
.toApplicationAttemptId(applicationAttemptIdStr);
|
||||||
|
MRAppMaster appMaster = new MRAppMaster(applicationAttemptId
|
||||||
|
.getApplicationId(), applicationAttemptId.getAttemptId());
|
||||||
Runtime.getRuntime().addShutdownHook(
|
Runtime.getRuntime().addShutdownHook(
|
||||||
new CompositeServiceShutdownHook(appMaster));
|
new CompositeServiceShutdownHook(appMaster));
|
||||||
YarnConfiguration conf = new YarnConfiguration(new JobConf());
|
YarnConfiguration conf = new YarnConfiguration(new JobConf());
|
||||||
|
|
|
@ -321,9 +321,6 @@ public class YARNRunner implements ClientProtocol {
|
||||||
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
|
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
|
||||||
|
|
||||||
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
|
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
|
||||||
vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
|
|
||||||
vargs.add(String.valueOf(applicationId.getId()));
|
|
||||||
vargs.add(ApplicationConstants.AM_FAIL_COUNT_STRING);
|
|
||||||
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
||||||
Path.SEPARATOR + ApplicationConstants.STDOUT);
|
Path.SEPARATOR + ApplicationConstants.STDOUT);
|
||||||
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
|
||||||
|
|
|
@ -37,8 +37,11 @@ public interface ApplicationConstants {
|
||||||
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
|
public static final String APPLICATION_CLIENT_SECRET_ENV_NAME =
|
||||||
"AppClientTokenEnv";
|
"AppClientTokenEnv";
|
||||||
|
|
||||||
// TODO: Weird. This is part of AM command line. Instead it should be a env.
|
/**
|
||||||
public static final String AM_FAIL_COUNT_STRING = "<FAILCOUNT>";
|
* The environmental variable for APPLICATION_ATTEMPT_ID. Set in
|
||||||
|
* ApplicationMaster's environment only.
|
||||||
|
*/
|
||||||
|
public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID";
|
||||||
|
|
||||||
public static final String CONTAINER_TOKEN_FILE_ENV_NAME =
|
public static final String CONTAINER_TOKEN_FILE_ENV_NAME =
|
||||||
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
|
UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.util;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.util.StringHelper._split;
|
import static org.apache.hadoop.yarn.util.StringHelper._split;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.text.NumberFormat;
|
import java.text.NumberFormat;
|
||||||
|
@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
public class ConverterUtils {
|
public class ConverterUtils {
|
||||||
|
|
||||||
public static final String APPLICATION_PREFIX = "application";
|
public static final String APPLICATION_PREFIX = "application";
|
||||||
|
public static final String CONTAINER_PREFIX = "container";
|
||||||
|
public static final String APPLICATION_ATTEMPT_PREFIX = "appattempt";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return a hadoop path from a given url
|
* return a hadoop path from a given url
|
||||||
|
@ -132,14 +135,12 @@ public class ConverterUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ApplicationAttemptId toApplicationAttemptId(
|
private static ApplicationAttemptId toApplicationAttemptId(
|
||||||
RecordFactory recordFactory,
|
Iterator<String> it) throws NumberFormatException {
|
||||||
Iterator<String> it) {
|
ApplicationId appId = Records.newRecord(ApplicationId.class);
|
||||||
ApplicationId appId =
|
|
||||||
recordFactory.newRecordInstance(ApplicationId.class);
|
|
||||||
appId.setClusterTimestamp(Long.parseLong(it.next()));
|
appId.setClusterTimestamp(Long.parseLong(it.next()));
|
||||||
appId.setId(Integer.parseInt(it.next()));
|
appId.setId(Integer.parseInt(it.next()));
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId = Records
|
||||||
recordFactory.newRecordInstance(ApplicationAttemptId.class);
|
.newRecord(ApplicationAttemptId.class);
|
||||||
appAttemptId.setApplicationId(appId);
|
appAttemptId.setApplicationId(appId);
|
||||||
appAttemptId.setAttemptId(Integer.parseInt(it.next()));
|
appAttemptId.setAttemptId(Integer.parseInt(it.next()));
|
||||||
return appAttemptId;
|
return appAttemptId;
|
||||||
|
@ -149,16 +150,35 @@ public class ConverterUtils {
|
||||||
return cId.toString();
|
return cId.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ContainerId toContainerId(RecordFactory recordFactory,
|
public static ContainerId toContainerId(String containerIdStr)
|
||||||
String containerIdStr) {
|
throws IOException {
|
||||||
Iterator<String> it = _split(containerIdStr).iterator();
|
Iterator<String> it = _split(containerIdStr).iterator();
|
||||||
it.next(); // prefix. TODO: Validate container prefix
|
if (!it.next().equals(CONTAINER_PREFIX)) {
|
||||||
ApplicationAttemptId appAttemptID =
|
throw new IOException("Invalid ContainerId prefix: " + containerIdStr);
|
||||||
toApplicationAttemptId(recordFactory, it);
|
}
|
||||||
ContainerId containerId =
|
try {
|
||||||
recordFactory.newRecordInstance(ContainerId.class);
|
ApplicationAttemptId appAttemptID = toApplicationAttemptId(it);
|
||||||
|
ContainerId containerId = Records.newRecord(ContainerId.class);
|
||||||
containerId.setApplicationAttemptId(appAttemptID);
|
containerId.setApplicationAttemptId(appAttemptID);
|
||||||
containerId.setId(Integer.parseInt(it.next()));
|
containerId.setId(Integer.parseInt(it.next()));
|
||||||
return containerId;
|
return containerId;
|
||||||
|
} catch (NumberFormatException n) {
|
||||||
|
throw new IOException("Invalid ContainerId: " + containerIdStr, n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ApplicationAttemptId toApplicationAttemptId(
|
||||||
|
String applicationAttmeptIdStr) throws IOException {
|
||||||
|
Iterator<String> it = _split(applicationAttmeptIdStr).iterator();
|
||||||
|
if (!it.next().equals(APPLICATION_ATTEMPT_PREFIX)) {
|
||||||
|
throw new IOException("Invalid AppAttemptId prefix: "
|
||||||
|
+ applicationAttmeptIdStr);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return toApplicationAttemptId(it);
|
||||||
|
} catch (NumberFormatException n) {
|
||||||
|
throw new IOException("Invalid AppAttemptId: "
|
||||||
|
+ applicationAttmeptIdStr, n);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,8 +31,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
|
@ -56,22 +54,26 @@ public class ContainerLogsPage extends NMView {
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final LocalDirAllocator logsSelector;
|
private final LocalDirAllocator logsSelector;
|
||||||
private final Context nmContext;
|
private final Context nmContext;
|
||||||
private final RecordFactory recordFactory;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ContainersLogsBlock(Configuration conf, Context context) {
|
public ContainersLogsBlock(Configuration conf, Context context) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.logsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
|
this.logsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
|
||||||
this.nmContext = context;
|
this.nmContext = context;
|
||||||
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void render(Block html) {
|
protected void render(Block html) {
|
||||||
DIV<Hamlet> div = html.div("#content");
|
DIV<Hamlet> div = html.div("#content");
|
||||||
|
|
||||||
ContainerId containerId =
|
ContainerId containerId;
|
||||||
ConverterUtils.toContainerId(this.recordFactory, $(CONTAINER_ID));
|
try {
|
||||||
|
containerId = ConverterUtils.toContainerId($(CONTAINER_ID));
|
||||||
|
} catch (IOException e) {
|
||||||
|
div.h1("Invalid containerId " + $(CONTAINER_ID))._();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Container container = this.nmContext.getContainers().get(containerId);
|
Container container = this.nmContext.getContainers().get(containerId);
|
||||||
|
|
||||||
if (container == null) {
|
if (container == null) {
|
||||||
|
|
|
@ -18,16 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.util.StringHelper.ujoin;
|
||||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
|
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
|
||||||
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
|
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
|
||||||
import static org.apache.hadoop.yarn.util.StringHelper.ujoin;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
@ -53,21 +52,23 @@ public class ContainerPage extends NMView implements NMWebParams {
|
||||||
|
|
||||||
public static class ContainerBlock extends HtmlBlock implements NMWebParams {
|
public static class ContainerBlock extends HtmlBlock implements NMWebParams {
|
||||||
|
|
||||||
private final Configuration conf;
|
|
||||||
private final Context nmContext;
|
private final Context nmContext;
|
||||||
private final RecordFactory recordFactory;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ContainerBlock(Configuration conf, Context nmContext) {
|
public ContainerBlock(Context nmContext) {
|
||||||
this.conf = conf;
|
|
||||||
this.nmContext = nmContext;
|
this.nmContext = nmContext;
|
||||||
this.recordFactory = RecordFactoryProvider.getRecordFactory(this.conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void render(Block html) {
|
protected void render(Block html) {
|
||||||
ContainerId containerID =
|
ContainerId containerID;
|
||||||
ConverterUtils.toContainerId(this.recordFactory, $(CONTAINER_ID));
|
try {
|
||||||
|
containerID = ConverterUtils.toContainerId($(CONTAINER_ID));
|
||||||
|
} catch (IOException e) {
|
||||||
|
html.p()._("Invalid containerId " + $(CONTAINER_ID))._();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Container container = this.nmContext.getContainers().get(containerID);
|
Container container = this.nmContext.getContainers().get(containerID);
|
||||||
ContainerStatus containerData = container.cloneAndGetContainerStatus();
|
ContainerStatus containerData = container.cloneAndGetContainerStatus();
|
||||||
int exitCode = containerData.getExitStatus();
|
int exitCode = containerData.getExitStatus();
|
||||||
|
|
|
@ -136,7 +136,7 @@ public class AMLauncher implements Runnable {
|
||||||
containerMgrProxy.stopContainer(stopRequest);
|
containerMgrProxy.stopContainer(stopRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ContainerManager getContainerMgrProxy(
|
protected ContainerManager getContainerMgrProxy(
|
||||||
final ApplicationId applicationID) throws IOException {
|
final ApplicationId applicationID) throws IOException {
|
||||||
|
|
||||||
Container container = application.getMasterContainer();
|
Container container = application.getMasterContainer();
|
||||||
|
@ -173,23 +173,11 @@ public class AMLauncher implements Runnable {
|
||||||
// Construct the actual Container
|
// Construct the actual Container
|
||||||
ContainerLaunchContext container =
|
ContainerLaunchContext container =
|
||||||
applicationMasterContext.getAMContainerSpec();
|
applicationMasterContext.getAMContainerSpec();
|
||||||
StringBuilder mergedCommand = new StringBuilder();
|
LOG.info("Command to launch container "
|
||||||
String failCount = Integer.toString(application.getAppAttemptId()
|
+ containerID
|
||||||
.getAttemptId());
|
+ " : "
|
||||||
List<String> commandList = new ArrayList<String>();
|
+ StringUtils.arrayToString(container.getCommands().toArray(
|
||||||
for (String str : container.getCommands()) {
|
new String[0])));
|
||||||
// This is out-right wrong. AM FAIL count should be passed via env.
|
|
||||||
String result =
|
|
||||||
str.replaceFirst(ApplicationConstants.AM_FAIL_COUNT_STRING,
|
|
||||||
failCount);
|
|
||||||
mergedCommand.append(result).append(" ");
|
|
||||||
commandList.add(result);
|
|
||||||
}
|
|
||||||
container.setCommands(commandList);
|
|
||||||
/** add the failed count to the app master command line */
|
|
||||||
|
|
||||||
LOG.info("Command to launch container " +
|
|
||||||
containerID + " : " + mergedCommand);
|
|
||||||
|
|
||||||
// Finalize the container
|
// Finalize the container
|
||||||
container.setContainerId(containerID);
|
container.setContainerId(containerID);
|
||||||
|
@ -203,6 +191,11 @@ public class AMLauncher implements Runnable {
|
||||||
ContainerLaunchContext container)
|
ContainerLaunchContext container)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Map<String, String> environment = container.getEnvironment();
|
Map<String, String> environment = container.getEnvironment();
|
||||||
|
|
||||||
|
// Set the AppAttemptId to be consumable by the AM.
|
||||||
|
environment.put(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV,
|
||||||
|
application.getAppAttemptId().toString());
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
// TODO: Security enabled/disabled info should come from RM.
|
// TODO: Security enabled/disabled info should come from RM.
|
||||||
|
|
||||||
|
|
|
@ -42,9 +42,9 @@ public class ApplicationMasterLauncher extends AbstractService implements
|
||||||
private final BlockingQueue<Runnable> masterEvents
|
private final BlockingQueue<Runnable> masterEvents
|
||||||
= new LinkedBlockingQueue<Runnable>();
|
= new LinkedBlockingQueue<Runnable>();
|
||||||
|
|
||||||
private ApplicationTokenSecretManager applicationTokenSecretManager;
|
protected ApplicationTokenSecretManager applicationTokenSecretManager;
|
||||||
private ClientToAMSecretManager clientToAMSecretManager;
|
private ClientToAMSecretManager clientToAMSecretManager;
|
||||||
private final RMContext context;
|
protected final RMContext context;
|
||||||
|
|
||||||
public ApplicationMasterLauncher(ApplicationTokenSecretManager
|
public ApplicationMasterLauncher(ApplicationTokenSecretManager
|
||||||
applicationTokenSecretManager, ClientToAMSecretManager clientToAMSecretManager,
|
applicationTokenSecretManager, ClientToAMSecretManager clientToAMSecretManager,
|
||||||
|
|
|
@ -195,6 +195,7 @@ public class MockRM extends ResourceManager {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected AdminService createAdminService() {
|
protected AdminService createAdminService() {
|
||||||
return new AdminService(getConfig(), scheduler, getRMContext(),
|
return new AdminService(getConfig(), scheduler, getRMContext(),
|
||||||
this.nodesListManager){
|
this.nodesListManager){
|
||||||
|
|
|
@ -0,0 +1,159 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||||
|
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestApplicationMasterLauncher {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(TestApplicationMasterLauncher.class);
|
||||||
|
|
||||||
|
private static final class MyContainerManagerImpl implements
|
||||||
|
ContainerManager {
|
||||||
|
|
||||||
|
boolean launched = false;
|
||||||
|
boolean cleanedup = false;
|
||||||
|
String attemptIdAtContainerManager = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StartContainerResponse
|
||||||
|
startContainer(StartContainerRequest request)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
LOG.info("Container started by MyContainerManager: " + request);
|
||||||
|
launched = true;
|
||||||
|
attemptIdAtContainerManager = request.getContainerLaunchContext()
|
||||||
|
.getEnvironment().get(
|
||||||
|
ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StopContainerResponse stopContainer(StopContainerRequest request)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
LOG.info("Container cleaned up by MyContainerManager");
|
||||||
|
cleanedup = true;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetContainerStatusResponse getContainerStatus(
|
||||||
|
GetContainerStatusRequest request) throws YarnRemoteException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class MockRMWithCustomAMLauncher extends MockRM {
|
||||||
|
|
||||||
|
private final ContainerManager containerManager;
|
||||||
|
|
||||||
|
public MockRMWithCustomAMLauncher(ContainerManager containerManager) {
|
||||||
|
this.containerManager = containerManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ApplicationMasterLauncher createAMLauncher() {
|
||||||
|
return new ApplicationMasterLauncher(super.appTokenSecretManager,
|
||||||
|
super.clientToAMSecretManager, getRMContext()) {
|
||||||
|
@Override
|
||||||
|
protected Runnable createRunnableLauncher(RMAppAttempt application,
|
||||||
|
AMLauncherEventType event) {
|
||||||
|
return new AMLauncher(context, application, event,
|
||||||
|
applicationTokenSecretManager, clientToAMSecretManager,
|
||||||
|
getConfig()) {
|
||||||
|
@Override
|
||||||
|
protected ContainerManager getContainerMgrProxy(
|
||||||
|
ApplicationId applicationID) throws IOException {
|
||||||
|
return containerManager;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAMLaunchAndCleanup() throws Exception {
|
||||||
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
|
rootLogger.setLevel(Level.DEBUG);
|
||||||
|
MyContainerManagerImpl containerManager = new MyContainerManagerImpl();
|
||||||
|
MockRMWithCustomAMLauncher rm = new MockRMWithCustomAMLauncher(
|
||||||
|
containerManager);
|
||||||
|
rm.start();
|
||||||
|
MockNM nm1 = rm.registerNode("h1:1234", 5120);
|
||||||
|
|
||||||
|
RMApp app = rm.submitApp(2000);
|
||||||
|
|
||||||
|
// kick the scheduling
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
int waitCount = 0;
|
||||||
|
while (containerManager.launched == false && waitCount++ < 20) {
|
||||||
|
LOG.info("Waiting for AM Launch to happen..");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Assert.assertTrue(containerManager.launched);
|
||||||
|
|
||||||
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||||
|
ApplicationAttemptId appAttemptId = attempt.getAppAttemptId();
|
||||||
|
Assert.assertEquals(appAttemptId.toString(),
|
||||||
|
containerManager.attemptIdAtContainerManager);
|
||||||
|
|
||||||
|
MockAM am = new MockAM(rm.getRMContext(), rm
|
||||||
|
.getApplicationMasterService(), appAttemptId);
|
||||||
|
am.registerAppAttempt();
|
||||||
|
am.unregisterAppAttempt();
|
||||||
|
|
||||||
|
waitCount = 0;
|
||||||
|
while (containerManager.cleanedup == false && waitCount++ < 20) {
|
||||||
|
LOG.info("Waiting for AM Cleanup to happen..");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Assert.assertTrue(containerManager.cleanedup);
|
||||||
|
|
||||||
|
am.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,193 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import junit.framework.Assert;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationState;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
||||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Testing the applications manager launcher.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public class TestApplicationMasterLauncher {
|
|
||||||
// private static final Log LOG = LogFactory.getLog(TestApplicationMasterLauncher.class);
|
|
||||||
// private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
||||||
// private ApplicationMasterLauncher amLauncher;
|
|
||||||
// private DummyEventHandler asmHandle;
|
|
||||||
// private final ApplicationTokenSecretManager applicationTokenSecretManager =
|
|
||||||
// new ApplicationTokenSecretManager();
|
|
||||||
// private final ClientToAMSecretManager clientToAMSecretManager =
|
|
||||||
// new ClientToAMSecretManager();
|
|
||||||
//
|
|
||||||
// Object doneLaunching = new Object();
|
|
||||||
// AtomicInteger launched = new AtomicInteger();
|
|
||||||
// AtomicInteger cleanedUp = new AtomicInteger();
|
|
||||||
// private RMContext context = new RMContextImpl(new MemStore(), null, null,
|
|
||||||
// null);
|
|
||||||
//
|
|
||||||
// private Configuration conf = new Configuration();
|
|
||||||
//
|
|
||||||
// private class DummyEventHandler implements EventHandler<ApplicationEvent> {
|
|
||||||
// @Override
|
|
||||||
// public void handle(ApplicationEvent appEvent) {
|
|
||||||
// ApplicationEventType event = appEvent.getType();
|
|
||||||
// switch (event) {
|
|
||||||
// case FINISH:
|
|
||||||
// synchronized(doneLaunching) {
|
|
||||||
// doneLaunching.notify();
|
|
||||||
// }
|
|
||||||
// break;
|
|
||||||
//
|
|
||||||
// default:
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private class DummyLaunch implements Runnable {
|
|
||||||
// public void run() {
|
|
||||||
// launched.incrementAndGet();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private class DummyCleanUp implements Runnable {
|
|
||||||
// private EventHandler eventHandler;
|
|
||||||
//
|
|
||||||
// public DummyCleanUp(EventHandler eventHandler) {
|
|
||||||
// this.eventHandler = eventHandler;
|
|
||||||
// }
|
|
||||||
// public void run() {
|
|
||||||
// cleanedUp.incrementAndGet();
|
|
||||||
// eventHandler.handle(new AMFinishEvent(null,
|
|
||||||
// ApplicationState.COMPLETED, "", ""));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private class DummyApplicationMasterLauncher extends
|
|
||||||
// ApplicationMasterLauncher {
|
|
||||||
// private EventHandler eventHandler;
|
|
||||||
//
|
|
||||||
// public DummyApplicationMasterLauncher(
|
|
||||||
// ApplicationTokenSecretManager applicationTokenSecretManager,
|
|
||||||
// ClientToAMSecretManager clientToAMSecretManager,
|
|
||||||
// EventHandler eventHandler) {
|
|
||||||
// super(applicationTokenSecretManager, clientToAMSecretManager, context);
|
|
||||||
// this.eventHandler = eventHandler;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Override
|
|
||||||
// protected Runnable createRunnableLauncher(RMAppAttempt application,
|
|
||||||
// AMLauncherEventType event) {
|
|
||||||
// Runnable r = null;
|
|
||||||
// switch (event) {
|
|
||||||
// case LAUNCH:
|
|
||||||
// r = new DummyLaunch();
|
|
||||||
// break;
|
|
||||||
// case CLEANUP:
|
|
||||||
// r = new DummyCleanUp(eventHandler);
|
|
||||||
// default:
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// return r;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Before
|
|
||||||
// public void setUp() {
|
|
||||||
// asmHandle = new DummyEventHandler();
|
|
||||||
// amLauncher = new DummyApplicationMasterLauncher(applicationTokenSecretManager,
|
|
||||||
// clientToAMSecretManager, asmHandle);
|
|
||||||
// context.getDispatcher().init(conf);
|
|
||||||
// amLauncher.init(conf);
|
|
||||||
// context.getDispatcher().start();
|
|
||||||
// amLauncher.start();
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @After
|
|
||||||
// public void tearDown() {
|
|
||||||
// amLauncher.stop();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @Test
|
|
||||||
// public void testAMLauncher() throws Exception {
|
|
||||||
//
|
|
||||||
// // Creat AppId
|
|
||||||
// ApplicationId appId = recordFactory
|
|
||||||
// .newRecordInstance(ApplicationId.class);
|
|
||||||
// appId.setClusterTimestamp(System.currentTimeMillis());
|
|
||||||
// appId.setId(1);
|
|
||||||
//
|
|
||||||
// ApplicationAttemptId appAttemptId = Records
|
|
||||||
// .newRecord(ApplicationAttemptId.class);
|
|
||||||
// appAttemptId.setApplicationId(appId);
|
|
||||||
// appAttemptId.setAttemptId(1);
|
|
||||||
//
|
|
||||||
// // Create submissionContext
|
|
||||||
// ApplicationSubmissionContext submissionContext = recordFactory
|
|
||||||
// .newRecordInstance(ApplicationSubmissionContext.class);
|
|
||||||
// submissionContext.setApplicationId(appId);
|
|
||||||
// submissionContext.setUser("dummyuser");
|
|
||||||
//
|
|
||||||
// RMAppAttempt appAttempt = new RMAppAttemptImpl(appAttemptId,
|
|
||||||
// "dummyclienttoken", context, null, submissionContext);
|
|
||||||
//
|
|
||||||
// // Tell AMLauncher to launch the appAttempt
|
|
||||||
// amLauncher.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH,
|
|
||||||
// appAttempt));
|
|
||||||
//
|
|
||||||
// // Tell AMLauncher to cleanup the appAttempt
|
|
||||||
// amLauncher.handle(new AMLauncherEvent(AMLauncherEventType.CLEANUP,
|
|
||||||
// appAttempt));
|
|
||||||
//
|
|
||||||
// synchronized (doneLaunching) {
|
|
||||||
// doneLaunching.wait(10000);
|
|
||||||
// }
|
|
||||||
// Assert.assertEquals(1, launched.get());
|
|
||||||
// Assert.assertEquals(1, cleanedUp.get());
|
|
||||||
// }
|
|
||||||
}
|
|
Loading…
Reference in New Issue