MAPREDUCE-5044. Have AM trigger jstack on task attempts that timeout before killing them. (Eric Payne and Gera Shegalov via mingma)

(cherry picked from commit 4a1cedc010d3fa1d8ef3f2773ca12acadfee5ba5)
(cherry picked from commit 74e2b5efa26f27027fed212b4b2108f0e95587fb)
This commit is contained in:
Ming Ma 2016-06-06 14:30:51 -07:00
parent f9478c95bd
commit ec4f9a14f9
34 changed files with 360 additions and 41 deletions

View File

@ -20,6 +20,10 @@
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -255,6 +259,30 @@ public void run() {
} else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
if (event.getDumpContainerThreads()) {
try {
// Construct full thread dump header
System.out.println(new java.util.Date());
RuntimeMXBean rtBean = ManagementFactory.getRuntimeMXBean();
System.out.println("Full thread dump " + rtBean.getVmName()
+ " (" + rtBean.getVmVersion()
+ " " + rtBean.getSystemProperties().get("java.vm.info")
+ "):\n");
// Dump threads' states and stacks
ThreadMXBean tmxBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] tInfos = tmxBean.dumpAllThreads(
tmxBean.isObjectMonitorUsageSupported(),
tmxBean.isSynchronizerUsageSupported());
for (ThreadInfo ti : tInfos) {
System.out.println(ti.toString());
}
} catch (Throwable t) {
// Failure to dump stack shouldn't cause method failure.
System.out.println("Could not create full thread dump: "
+ t.getMessage());
}
}
// cancel (and interrupt) the current running task associated with the
// event
TaskAttemptId taId = event.getTaskAttemptID();

View File

@ -2168,7 +2168,8 @@ private static void sendContainerCleanup(TaskAttemptImpl taskAttempt,
taskAttempt.container.getId(), StringInterner
.weakIntern(taskAttempt.container.getNodeId().toString()),
taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP,
event.getType() == TaskAttemptEventType.TA_TIMED_OUT));
}
/**

View File

@ -30,17 +30,29 @@ public class ContainerLauncherEvent
private ContainerId containerID;
private String containerMgrAddress;
private Token containerToken;
private boolean dumpContainerThreads;
public ContainerLauncherEvent(TaskAttemptId taskAttemptID,
ContainerId containerID,
String containerMgrAddress,
Token containerToken,
ContainerLauncher.EventType type) {
this(taskAttemptID, containerID, containerMgrAddress, containerToken, type,
false);
}
public ContainerLauncherEvent(TaskAttemptId taskAttemptID,
ContainerId containerID,
String containerMgrAddress,
Token containerToken,
ContainerLauncher.EventType type,
boolean dumpContainerThreads) {
super(type);
this.taskAttemptID = taskAttemptID;
this.containerID = containerID;
this.containerMgrAddress = containerMgrAddress;
this.containerToken = containerToken;
this.dumpContainerThreads = dumpContainerThreads;
}
public TaskAttemptId getTaskAttemptID() {
@ -59,6 +71,10 @@ public Token getContainerToken() {
return containerToken;
}
public boolean getDumpContainerThreads() {
return dumpContainerThreads;
}
@Override
public String toString() {
return super.toString() + " for container " + containerID + " taskAttempt "
@ -77,6 +93,8 @@ public int hashCode() {
+ ((containerToken == null) ? 0 : containerToken.hashCode());
result = prime * result
+ ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode());
result = prime * result
+ (dumpContainerThreads ? 1 : 0);
return result;
}
@ -109,7 +127,8 @@ public boolean equals(Object obj) {
return false;
} else if (!taskAttemptID.equals(other.taskAttemptID))
return false;
return true;
return dumpContainerThreads == other.dumpContainerThreads;
}
}

View File

@ -45,6 +45,8 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -52,6 +54,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -190,9 +193,13 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
}
}
}
public void kill() {
kill(false);
}
@SuppressWarnings("unchecked")
public synchronized void kill() {
public synchronized void kill(boolean dumpThreads) {
if(this.state == ContainerState.PREP) {
this.state = ContainerState.KILLED_BEFORE_LAUNCH;
@ -203,6 +210,13 @@ public synchronized void kill() {
try {
proxy = getCMProxy(this.containerMgrAddress, this.containerID);
if (dumpThreads) {
final SignalContainerRequest request = SignalContainerRequest
.newInstance(containerID,
SignalContainerCommand.OUTPUT_THREAD_DUMP);
proxy.getContainerManagementProtocol().signalToContainer(request);
}
// kill the remote container if already launched
List<ContainerId> ids = new ArrayList<ContainerId>();
ids.add(this.containerID);
@ -380,7 +394,7 @@ public void run() {
break;
case CONTAINER_REMOTE_CLEANUP:
c.kill();
c.kill(event.getDumpContainerThreads());
break;
case CONTAINER_COMPLETED:

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -58,6 +57,8 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -73,6 +74,7 @@
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
@ -460,5 +462,11 @@ public IncreaseContainersResourceResponse increaseContainersResource(
"Dummy function cause"));
throw new IOException(e);
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
}

View File

@ -50,6 +50,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -465,6 +467,12 @@ public IncreaseContainersResourceResponse increaseContainersResource(
@Override
public void close() throws IOException {
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
@SuppressWarnings("serial")

View File

@ -496,8 +496,9 @@ public void updateApplicationPriority(ApplicationId applicationId,
}
@Override
public void signalContainer(ContainerId containerId, SignalContainerCommand command)
public void signalToContainer(ContainerId containerId,
SignalContainerCommand command)
throws YarnException, IOException {
client.signalContainer(containerId, command);
client.signalToContainer(containerId, command);
}
}

View File

@ -481,7 +481,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
}
@Override
public SignalContainerResponse signalContainer(
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws IOException {
return null;
}

View File

@ -24,6 +24,7 @@
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
@ -989,6 +990,124 @@ public void testDistributedCache() throws Exception {
_testDistributedCache(remoteJobJarPath.toUri().toString());
}
@Test(timeout = 120000)
public void testThreadDumpOnTaskTimeout() throws IOException,
InterruptedException, ClassNotFoundException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
final SleepJob sleepJob = new SleepJob();
final JobConf sleepConf = new JobConf(mrCluster.getConfig());
sleepConf.setLong(MRJobConfig.TASK_TIMEOUT, 3 * 1000L);
sleepConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
sleepJob.setConf(sleepConf);
if (this instanceof TestUberAM) {
sleepConf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS,
30 * 1000);
}
// sleep for 10 seconds to trigger a kill with thread dump
final Job job = sleepJob.createJob(1, 0, 10 * 60 * 1000L, 1, 0L, 0);
job.setJarByClass(SleepJob.class);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.waitForCompletion(true);
final JobId jobId = TypeConverter.toYarn(job.getJobID());
final ApplicationId appID = jobId.getAppId();
int pollElapsed = 0;
while (true) {
Thread.sleep(1000);
pollElapsed += 1000;
if (TERMINAL_RM_APP_STATES.contains(mrCluster.getResourceManager()
.getRMContext().getRMApps().get(appID).getState())) {
break;
}
if (pollElapsed >= 60000) {
LOG.warn("application did not reach terminal state within 60 seconds");
break;
}
}
// Job finished, verify logs
//
final String appIdStr = appID.toString();
final String appIdSuffix = appIdStr.substring("application_".length(),
appIdStr.length());
final String containerGlob = "container_" + appIdSuffix + "_*_*";
final String syslogGlob = appIdStr
+ Path.SEPARATOR + containerGlob
+ Path.SEPARATOR + TaskLog.LogName.SYSLOG;
int numAppMasters = 0;
int numMapTasks = 0;
for (int i = 0; i < NUM_NODE_MGRS; i++) {
final Configuration nmConf = mrCluster.getNodeManager(i).getConfig();
for (String logDir :
nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) {
final Path absSyslogGlob =
new Path(logDir + Path.SEPARATOR + syslogGlob);
LOG.info("Checking for glob: " + absSyslogGlob);
for (FileStatus syslog : localFs.globStatus(absSyslogGlob)) {
boolean foundAppMaster = false;
boolean foundThreadDump = false;
// Determine the container type
final BufferedReader syslogReader = new BufferedReader(
new InputStreamReader(localFs.open(syslog.getPath())));
try {
for (String line; (line = syslogReader.readLine()) != null; ) {
if (line.contains(MRAppMaster.class.getName())) {
foundAppMaster = true;
break;
}
}
} finally {
syslogReader.close();
}
// Check for thread dump in stdout
final Path stdoutPath = new Path(syslog.getPath().getParent(),
TaskLog.LogName.STDOUT.toString());
final BufferedReader stdoutReader = new BufferedReader(
new InputStreamReader(localFs.open(stdoutPath)));
try {
for (String line; (line = stdoutReader.readLine()) != null; ) {
if (line.contains("Full thread dump")) {
foundThreadDump = true;
break;
}
}
} finally {
stdoutReader.close();
}
if (foundAppMaster) {
numAppMasters++;
if (this instanceof TestUberAM) {
Assert.assertTrue("No thread dump", foundThreadDump);
} else {
Assert.assertFalse("Unexpected thread dump", foundThreadDump);
}
} else {
numMapTasks++;
Assert.assertTrue("No thread dump", foundThreadDump);
}
}
}
}
// Make sure we checked non-empty set
//
Assert.assertEquals("No AppMaster log found!", 1, numAppMasters);
if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) {
Assert.assertSame("MapTask log with uber found!", 0, numMapTasks);
} else {
Assert.assertSame("No MapTask log found!", 1, numMapTasks);
}
}
private Path createTempFile(String filename, String contents)
throws IOException {
Path path = new Path(TEST_ROOT_DIR, filename);

View File

@ -563,7 +563,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
*/
@Public
@Unstable
public SignalContainerResponse signalContainer(
SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException,
IOException;
}

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -194,4 +196,7 @@ GetContainerStatusesResponse getContainerStatuses(
IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException;
SignalContainerResponse signalToContainer(SignalContainerRequest request)
throws YarnException, IOException;
}

View File

@ -28,7 +28,7 @@
*
* <p>Currently it's empty.</p>
*
* @see ApplicationClientProtocol#signalContainer(SignalContainerRequest)
* @see ApplicationClientProtocol#signalToContainer(SignalContainerRequest)
*/
@Public
@Evolving

View File

@ -59,5 +59,5 @@ service ApplicationClientProtocolService {
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
}

View File

@ -35,4 +35,5 @@ service ContainerManagementProtocolService {
rpc stopContainers(StopContainersRequestProto) returns (StopContainersResponseProto);
rpc getContainerStatuses(GetContainerStatusesRequestProto) returns (GetContainerStatusesResponseProto);
rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto);
rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
}

View File

@ -775,6 +775,6 @@ public abstract void updateApplicationPriority(ApplicationId applicationId,
* @throws YarnException
* @throws IOException
*/
public abstract void signalContainer(ContainerId containerId,
public abstract void signalToContainer(ContainerId containerId,
SignalContainerCommand command) throws YarnException, IOException;
}

View File

@ -858,12 +858,12 @@ public void updateApplicationPriority(ApplicationId applicationId,
}
@Override
public void signalContainer(ContainerId containerId,
public void signalToContainer(ContainerId containerId,
SignalContainerCommand command)
throws YarnException, IOException {
LOG.info("Signalling container " + containerId + " with command " + command);
SignalContainerRequest request =
SignalContainerRequest.newInstance(containerId, command);
rmClient.signalContainer(request);
rmClient.signalToContainer(request);
}
}

View File

@ -284,7 +284,7 @@ public int run(String[] args) throws Exception {
if (signalArgs.length == 2) {
command = SignalContainerCommand.valueOf(signalArgs[1]);
}
signalContainer(containerId, command);
signalToContainer(containerId, command);
} else {
syserr.println("Invalid Command Usage : ");
printUsage(title, opts);
@ -299,11 +299,11 @@ public int run(String[] args) throws Exception {
* @param command the signal command
* @throws YarnException
*/
private void signalContainer(String containerIdStr,
private void signalToContainer(String containerIdStr,
SignalContainerCommand command) throws YarnException, IOException {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
sysout.println("Signalling container " + containerIdStr);
client.signalContainer(containerId, command);
client.signalToContainer(containerId, command);
}
/**

View File

@ -1686,11 +1686,11 @@ public void testSignalContainer() throws Exception {
applicationId, 1);
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP;
client.signalContainer(containerId, command);
client.signalToContainer(containerId, command);
final ArgumentCaptor<SignalContainerRequest> signalReqCaptor =
ArgumentCaptor.forClass(SignalContainerRequest.class);
verify(((MockYarnClient) client).getRMClient())
.signalContainer(signalReqCaptor.capture());
.signalToContainer(signalReqCaptor.capture());
SignalContainerRequest request = signalReqCaptor.getValue();
Assert.assertEquals(containerId, request.getContainerId());
Assert.assertEquals(command, request.getCommand());

View File

@ -21,6 +21,11 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.yarn.proto.ContainerManagementProtocol.ContainerManagementProtocolService;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@Private
@Unstable
@ -29,4 +34,6 @@
protocolVersion = 1)
public interface ContainerManagementProtocolPB extends ContainerManagementProtocolService.BlockingInterface {
SignalContainerResponseProto signalToContainer(RpcController arg0,
SignalContainerRequestProto proto) throws ServiceException;
}

View File

@ -588,13 +588,13 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
}
@Override
public SignalContainerResponse signalContainer(
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
YarnServiceProtos.SignalContainerRequestProto requestProto =
((SignalContainerRequestPBImpl) request).getProto();
try {
return new SignalContainerResponsePBImpl(
proxy.signalContainer(null, requestProto));
proxy.signalToContainer(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;

View File

@ -34,6 +34,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -42,6 +44,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
@ -50,6 +54,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
@ -148,4 +153,18 @@ public IncreaseContainersResourceResponse increaseContainersResource(
return null;
}
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
SignalContainerRequestProto requestProto =
((SignalContainerRequestPBImpl) request).getProto();
try {
return new SignalContainerResponsePBImpl(
proxy.signalToContainer(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -596,11 +596,12 @@ public UpdateApplicationPriorityResponseProto updateApplicationPriority(
}
@Override
public SignalContainerResponseProto signalContainer(RpcController controller,
public SignalContainerResponseProto signalToContainer(
RpcController controller,
YarnServiceProtos.SignalContainerRequestProto proto) throws ServiceException {
SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto);
try {
SignalContainerResponse response = real.signalContainer(request);
SignalContainerResponse response = real.signalToContainer(request);
return ((SignalContainerResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);

View File

@ -25,12 +25,15 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
@ -40,6 +43,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto;
@ -116,4 +121,19 @@ public IncreaseContainersResourceResponseProto increaseContainersResource(
throw new ServiceException(e);
}
}
@Override
public SignalContainerResponseProto signalToContainer(RpcController arg0,
SignalContainerRequestProto proto) throws ServiceException {
final SignalContainerRequestPBImpl request =
new SignalContainerRequestPBImpl(proto);
try {
final SignalContainerResponse response = real.signalToContainer(request);
return ((SignalContainerResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -35,6 +35,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -174,5 +176,13 @@ public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
final Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw new YarnException(e);
}
}
}

View File

@ -29,6 +29,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -158,5 +160,11 @@ public IncreaseContainersResourceResponse increaseContainersResource(
}
throw new YarnException("Shouldn't happen!!");
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
}

View File

@ -38,6 +38,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -227,6 +229,14 @@ public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException, IOException {
return null;
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException {
final Exception e = new Exception(EXCEPTION_MSG,
new Exception(EXCEPTION_CAUSE));
throw new YarnException(e);
}
}
public static ContainerTokenIdentifier newContainerTokenIdentifier(

View File

@ -61,11 +61,13 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -147,6 +149,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerManagerImpl extends CompositeService implements
@ -1348,16 +1351,7 @@ public void handle(ContainerManagerEvent event) {
(CMgrSignalContainersEvent) event;
for (SignalContainerRequest request : containersSignalEvent
.getContainersToSignal()) {
ContainerId containerId = request.getContainerId();
Container container = this.context.getContainers().get(containerId);
if (container != null) {
LOG.info(containerId + " signal request by ResourceManager.");
this.dispatcher.getEventHandler().handle(
new SignalContainersLauncherEvent(container,
request.getCommand()));
} else {
LOG.info("Container " + containerId + " no longer exists");
}
internalSignalToContainer(request, "ResourceManager");
}
break;
default:
@ -1399,4 +1393,27 @@ protected void setAMRMProxyService(AMRMProxyService amrmProxyService) {
this.amrmProxyService = amrmProxyService;
}
@SuppressWarnings("unchecked")
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
internalSignalToContainer(request, "Application Master");
return new SignalContainerResponsePBImpl();
}
@SuppressWarnings("unchecked")
private void internalSignalToContainer(SignalContainerRequest request,
String sentBy) {
ContainerId containerId = request.getContainerId();
Container container = this.context.getContainers().get(containerId);
if (container != null) {
LOG.info(containerId + " signal request " + request.getCommand()
+ " by " + sentBy);
this.dispatcher.getEventHandler().handle(
new SignalContainersLauncherEvent(container,
request.getCommand()));
} else {
LOG.info("Container " + containerId + " no longer exists");
}
}
}

View File

@ -486,7 +486,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
}
@Override
public SignalContainerResponse signalContainer(
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws IOException {
return null;
}

View File

@ -1625,7 +1625,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
*/
@Override
public SignalContainerResponse signalContainer(
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
ContainerId containerId = request.getContainerId();

View File

@ -932,11 +932,11 @@ public RMActiveServices getRMActiveService() {
return activeServices;
}
public void signalContainer(ContainerId containerId, SignalContainerCommand command)
throws Exception {
public void signalToContainer(ContainerId containerId,
SignalContainerCommand command) throws Exception {
ApplicationClientProtocol client = getClientRMService();
SignalContainerRequest req =
SignalContainerRequest.newInstance(containerId, command);
client.signalContainer(req);
client.signalToContainer(req);
}
}

View File

@ -28,13 +28,14 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -317,4 +318,10 @@ public IncreaseContainersResourceResponse increaseContainersResource(
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;
}
@Override
public synchronized SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
throw new YarnException("Not supported yet!");
}
}

View File

@ -46,6 +46,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
@ -138,6 +140,12 @@ public Credentials getContainerCredentials() throws IOException {
credentials.readTokenStorageStream(buf);
return credentials;
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {

View File

@ -37,6 +37,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -149,6 +151,12 @@ public IncreaseContainersResourceResponse increaseContainersResource(
throws YarnException {
return null;
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
return null;
}
}
@Test

View File

@ -83,7 +83,7 @@ public void testSignalRequestDeliveryToNM() throws Exception {
Assert.assertEquals(request, contReceived);
for(Container container : conts) {
rm.signalContainer(container.getId(),
rm.signalToContainer(container.getId(),
SignalContainerCommand.OUTPUT_THREAD_DUMP);
}