MAPREDUCE-3054. Unable to kill submitted jobs. (mahadev) - Merging r1176600 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1176604 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-09-27 20:36:20 +00:00
parent 7eafb4d60e
commit 8203f69ebc
22 changed files with 403 additions and 193 deletions

View File

@ -1420,6 +1420,8 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3095. fairscheduler ivy including wrong version for hdfs. MAPREDUCE-3095. fairscheduler ivy including wrong version for hdfs.
(John George via mahadev) (John George via mahadev)
MAPREDUCE-3054. Unable to kill submitted jobs. (mahadev)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1,20 +1,20 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
@ -46,25 +46,25 @@ public class ClientCache {
private static final Log LOG = LogFactory.getLog(ClientCache.class); private static final Log LOG = LogFactory.getLog(ClientCache.class);
private Map<JobID, ClientServiceDelegate> cache = private Map<JobID, ClientServiceDelegate> cache =
new HashMap<JobID, ClientServiceDelegate>(); new HashMap<JobID, ClientServiceDelegate>();
private MRClientProtocol hsProxy; private MRClientProtocol hsProxy;
ClientCache(Configuration conf, ResourceMgrDelegate rm) { public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
this.conf = conf; this.conf = conf;
this.rm = rm; this.rm = rm;
} }
//TODO: evict from the cache on some threshold //TODO: evict from the cache on some threshold
synchronized ClientServiceDelegate getClient(JobID jobId) { public synchronized ClientServiceDelegate getClient(JobID jobId) {
if (hsProxy == null) { if (hsProxy == null) {
try { try {
hsProxy = instantiateHistoryProxy(); hsProxy = instantiateHistoryProxy();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Could not connect to History server.", e); LOG.warn("Could not connect to History server.", e);
throw new YarnException("Could not connect to History server.", e); throw new YarnException("Could not connect to History server.", e);
} }
} }
ClientServiceDelegate client = cache.get(jobId); ClientServiceDelegate client = cache.get(jobId);
if (client == null) { if (client == null) {
client = new ClientServiceDelegate(conf, rm, jobId, hsProxy); client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
@ -74,7 +74,7 @@ public class ClientCache {
} }
private MRClientProtocol instantiateHistoryProxy() private MRClientProtocol instantiateHistoryProxy()
throws IOException { throws IOException {
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS); final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
if (StringUtils.isEmpty(serviceAddr)) { if (StringUtils.isEmpty(serviceAddr)) {
return null; return null;

View File

@ -70,7 +70,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo; import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
class ClientServiceDelegate { public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
// Caches for per-user NotRunningJobs // Caches for per-user NotRunningJobs
@ -87,7 +87,7 @@ class ClientServiceDelegate {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User"; private static String UNKNOWN_USER = "Unknown User";
ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
JobID jobId, MRClientProtocol historyServerProxy) { JobID jobId, MRClientProtocol historyServerProxy) {
this.conf = new Configuration(conf); // Cloning for modifying. this.conf = new Configuration(conf); // Cloning for modifying.
// For faster redirects from AM to HS. // For faster redirects from AM to HS.
@ -279,7 +279,7 @@ class ClientServiceDelegate {
} }
} }
org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException { InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0); org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class); GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
@ -290,7 +290,7 @@ class ClientServiceDelegate {
} }
TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2) public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
throws IOException, InterruptedException { throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
.toYarn(arg0); .toYarn(arg0);
@ -308,7 +308,7 @@ class ClientServiceDelegate {
.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0])); .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
} }
String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0) public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
throws IOException, InterruptedException { throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
@ -326,7 +326,7 @@ class ClientServiceDelegate {
return result; return result;
} }
JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException { public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID); TypeConverter.toYarn(oldJobID);
GetJobReportRequest request = GetJobReportRequest request =
@ -339,7 +339,7 @@ class ClientServiceDelegate {
return TypeConverter.fromYarn(report, jobFile); return TypeConverter.fromYarn(report, jobFile);
} }
org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType) public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
throws YarnRemoteException, YarnRemoteException { throws YarnRemoteException, YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID); TypeConverter.toYarn(oldJobID);
@ -356,7 +356,7 @@ class ClientServiceDelegate {
(taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]); (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
} }
boolean killTask(TaskAttemptID taskAttemptID, boolean fail) public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
throws YarnRemoteException { throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID); = TypeConverter.toYarn(taskAttemptID);
@ -372,7 +372,7 @@ class ClientServiceDelegate {
return true; return true;
} }
boolean killJob(JobID oldJobID) public boolean killJob(JobID oldJobID)
throws YarnRemoteException { throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID); = TypeConverter.toYarn(oldJobID);

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -79,6 +79,10 @@ public class ResourceMgrDelegate {
private ApplicationId applicationId; private ApplicationId applicationId;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
/**
* Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
* @param conf the configuration object.
*/
public ResourceMgrDelegate(YarnConfiguration conf) { public ResourceMgrDelegate(YarnConfiguration conf) {
this.conf = conf; this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf); YarnRPC rpc = YarnRPC.create(this.conf);
@ -97,6 +101,16 @@ public class ResourceMgrDelegate {
LOG.info("Connected to ResourceManager at " + rmAddress); LOG.info("Connected to ResourceManager at " + rmAddress);
} }
/**
* Used for injecting applicationsManager, mostly for testing.
* @param conf the configuration object
* @param applicationsManager the handle to talk the resource managers {@link ClientRMProtocol}.
*/
public ResourceMgrDelegate(YarnConfiguration conf, ClientRMProtocol applicationsManager) {
this.conf = conf;
this.applicationsManager = applicationsManager;
}
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0) public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException { throws IOException, InterruptedException {
return; return;
@ -294,9 +308,9 @@ public class ResourceMgrDelegate {
} }
public void killApplication(ApplicationId applicationId) throws IOException { public void killApplication(ApplicationId applicationId) throws IOException {
FinishApplicationRequest request = recordFactory.newRecordInstance(FinishApplicationRequest.class); KillApplicationRequest request = recordFactory.newRecordInstance(KillApplicationRequest.class);
request.setApplicationId(applicationId); request.setApplicationId(applicationId);
applicationsManager.finishApplication(request); applicationsManager.forceKillApplication(request);
LOG.info("Killing application " + applicationId); LOG.info("Killing application " + applicationId);
} }

View File

@ -105,10 +105,22 @@ public class YARNRunner implements ClientProtocol {
* @param resMgrDelegate the resourcemanager client handle. * @param resMgrDelegate the resourcemanager client handle.
*/ */
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) { public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
}
/**
* Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
* but allowing injecting {@link ClientCache}. Enable mocking and testing.
* @param conf the configuration object
* @param resMgrDelegate the resource manager delegate
* @param clientCache the client cache object.
*/
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
ClientCache clientCache) {
this.conf = conf; this.conf = conf;
try { try {
this.resMgrDelegate = resMgrDelegate; this.resMgrDelegate = resMgrDelegate;
this.clientCache = new ClientCache(this.conf, resMgrDelegate); this.clientCache = clientCache;
this.defaultFileContext = FileContext.getFileContext(this.conf); this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) { } catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe); throw new RuntimeException("Error in instantiating YarnClient", ufe);
@ -429,9 +441,35 @@ public class YARNRunner implements ClientProtocol {
@Override @Override
public void killJob(JobID arg0) throws IOException, InterruptedException { public void killJob(JobID arg0) throws IOException, InterruptedException {
if (!clientCache.getClient(arg0).killJob(arg0)) { /* check if the status is not running, if not send kill to RM */
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
} if (status.getState() != JobStatus.State.RUNNING) {
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
return;
}
try {
/* send a kill to the AM */
clientCache.getClient(arg0).killJob(arg0);
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
!= JobStatus.State.KILLED)) {
try {
Thread.sleep(1000L);
} catch(InterruptedException ie) {
/** interrupted, just break */
break;
}
currentTimeMillis = System.currentTimeMillis();
status = clientCache.getClient(arg0).getJobStatus(arg0);
}
} catch(IOException io) {
LOG.debug("Error when checking for application status", io);
}
if (status.getState() != JobStatus.State.KILLED) {
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
}
} }
@Override @Override

View File

@ -68,8 +68,8 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -288,9 +288,9 @@ public class TestClientRedirect {
} }
@Override @Override
public FinishApplicationResponse finishApplication( public KillApplicationResponse forceKillApplication(
FinishApplicationRequest request) throws YarnRemoteException { KillApplicationRequest request) throws YarnRemoteException {
return null; return recordFactory.newRecordInstance(KillApplicationResponse.class);
} }
@Override @Override
@ -451,7 +451,7 @@ public class TestClientRedirect {
@Override @Override
public KillJobResponse killJob(KillJobRequest request) public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException { throws YarnRemoteException {
return null; return recordFactory.newRecordInstance(KillJobResponse.class);
} }
@Override @Override

View File

@ -22,6 +22,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
@ -36,15 +37,38 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClientCache;
import org.apache.hadoop.mapred.ClientServiceDelegate;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.ResourceMgrDelegate; import org.apache.hadoop.mapred.ResourceMgrDelegate;
import org.apache.hadoop.mapred.YARNRunner; import org.apache.hadoop.mapred.YARNRunner;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState; import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -54,9 +78,8 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
/** /**
* Test if the jobclient shows enough diagnostics * Test YarnRunner and make sure the client side plugin works
* on a job failure. * fine
*
*/ */
public class TestYARNRunner extends TestCase { public class TestYARNRunner extends TestCase {
private static final Log LOG = LogFactory.getLog(TestYARNRunner.class); private static final Log LOG = LogFactory.getLog(TestYARNRunner.class);
@ -65,18 +88,22 @@ public class TestYARNRunner extends TestCase {
private YARNRunner yarnRunner; private YARNRunner yarnRunner;
private ResourceMgrDelegate resourceMgrDelegate; private ResourceMgrDelegate resourceMgrDelegate;
private YarnConfiguration conf; private YarnConfiguration conf;
private ClientCache clientCache;
private ApplicationId appId; private ApplicationId appId;
private JobID jobId; private JobID jobId;
private File testWorkDir = private File testWorkDir =
new File("target", TestYARNRunner.class.getName()); new File("target", TestYARNRunner.class.getName());
private ApplicationSubmissionContext submissionContext; private ApplicationSubmissionContext submissionContext;
private ClientServiceDelegate clientDelegate;
private static final String failString = "Rejected job"; private static final String failString = "Rejected job";
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
resourceMgrDelegate = mock(ResourceMgrDelegate.class); resourceMgrDelegate = mock(ResourceMgrDelegate.class);
conf = new YarnConfiguration(); conf = new YarnConfiguration();
yarnRunner = new YARNRunner(conf, resourceMgrDelegate); clientCache = new ClientCache(conf, resourceMgrDelegate);
clientCache = spy(clientCache);
yarnRunner = new YARNRunner(conf, resourceMgrDelegate, clientCache);
yarnRunner = spy(yarnRunner); yarnRunner = spy(yarnRunner);
submissionContext = mock(ApplicationSubmissionContext.class); submissionContext = mock(ApplicationSubmissionContext.class);
doAnswer( doAnswer(
@ -101,6 +128,31 @@ public class TestYARNRunner extends TestCase {
} }
@Test
public void testJobKill() throws Exception {
clientDelegate = mock(ClientServiceDelegate.class);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
State.PREP, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
when(clientDelegate.killJob(any(JobID.class))).thenReturn(true);
doAnswer(
new Answer<ClientServiceDelegate>() {
@Override
public ClientServiceDelegate answer(InvocationOnMock invocation)
throws Throwable {
return clientDelegate;
}
}
).when(clientCache).getClient(any(JobID.class));
yarnRunner.killJob(jobId);
verify(resourceMgrDelegate).killApplication(appId);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
yarnRunner.killJob(jobId);
verify(clientDelegate).killJob(jobId);
}
@Test @Test
public void testJobSubmissionFailure() throws Exception { public void testJobSubmissionFailure() throws Exception {
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))). when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
@ -122,4 +174,66 @@ public class TestYARNRunner extends TestCase {
assertTrue(io.getLocalizedMessage().contains(failString)); assertTrue(io.getLocalizedMessage().contains(failString));
} }
} }
@Test
public void testResourceMgrDelegate() throws Exception {
/* we not want a mock of resourcemgr deleagte */
ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol);
/* make sure kill calls finish application master */
when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
.thenReturn(null);
delegate.killApplication(appId);
verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));
/* make sure getalljobs calls get all applications */
when(clientRMProtocol.getAllApplications(any(GetAllApplicationsRequest.class))).
thenReturn(recordFactory.newRecordInstance(GetAllApplicationsResponse.class));
delegate.getAllJobs();
verify(clientRMProtocol).getAllApplications(any(GetAllApplicationsRequest.class));
/* make sure getapplication report is called */
when(clientRMProtocol.getApplicationReport(any(GetApplicationReportRequest.class)))
.thenReturn(recordFactory.newRecordInstance(GetApplicationReportResponse.class));
delegate.getApplicationReport(appId);
verify(clientRMProtocol).getApplicationReport(any(GetApplicationReportRequest.class));
/* make sure metrics is called */
GetClusterMetricsResponse clusterMetricsResponse = recordFactory.newRecordInstance
(GetClusterMetricsResponse.class);
clusterMetricsResponse.setClusterMetrics(recordFactory.newRecordInstance(
YarnClusterMetrics.class));
when(clientRMProtocol.getClusterMetrics(any(GetClusterMetricsRequest.class)))
.thenReturn(clusterMetricsResponse);
delegate.getClusterMetrics();
verify(clientRMProtocol).getClusterMetrics(any(GetClusterMetricsRequest.class));
when(clientRMProtocol.getClusterNodes(any(GetClusterNodesRequest.class))).
thenReturn(recordFactory.newRecordInstance(GetClusterNodesResponse.class));
delegate.getActiveTrackers();
verify(clientRMProtocol).getClusterNodes(any(GetClusterNodesRequest.class));
GetNewApplicationIdResponse newAppIdResponse = recordFactory.newRecordInstance(
GetNewApplicationIdResponse.class);
newAppIdResponse.setApplicationId(appId);
when(clientRMProtocol.getNewApplicationId(any(GetNewApplicationIdRequest.class))).
thenReturn(newAppIdResponse);
delegate.getNewJobID();
verify(clientRMProtocol).getNewApplicationId(any(GetNewApplicationIdRequest.class));
GetQueueInfoResponse queueInfoResponse = recordFactory.newRecordInstance(
GetQueueInfoResponse.class);
queueInfoResponse.setQueueInfo(recordFactory.newRecordInstance(QueueInfo.class));
when(clientRMProtocol.getQueueInfo(any(GetQueueInfoRequest.class))).
thenReturn(queueInfoResponse);
delegate.getQueues();
verify(clientRMProtocol).getQueueInfo(any(GetQueueInfoRequest.class));
GetQueueUserAclsInfoResponse aclResponse = recordFactory.newRecordInstance(
GetQueueUserAclsInfoResponse.class);
when(clientRMProtocol.getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)))
.thenReturn(aclResponse);
delegate.getQueueAclsForCurrentUser();
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
}
} }

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.api;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -102,7 +102,7 @@ public interface ClientRMProtocol {
* <p>The interface used by clients to request the * <p>The interface used by clients to request the
* <code>ResourceManager</code> to abort submitted application.</p> * <code>ResourceManager</code> to abort submitted application.</p>
* *
* <p>The client, via {@link FinishApplicationRequest} provides the * <p>The client, via {@link KillApplicationRequest} provides the
* {@link ApplicationId} of the application to be aborted.</p> * {@link ApplicationId} of the application to be aborted.</p>
* *
* <p> In secure mode,the <code>ResourceManager</code> verifies access to the * <p> In secure mode,the <code>ResourceManager</code> verifies access to the
@ -117,8 +117,8 @@ public interface ClientRMProtocol {
* @throws YarnRemoteException * @throws YarnRemoteException
* @see #getQueueUserAcls(GetQueueUserAclsInfoRequest) * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)
*/ */
public FinishApplicationResponse finishApplication( public KillApplicationResponse forceKillApplication(
FinishApplicationRequest request) KillApplicationRequest request)
throws YarnRemoteException; throws YarnRemoteException;
/** /**

View File

@ -32,11 +32,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
* <p>The request includes the {@link ApplicationId} of the application to be * <p>The request includes the {@link ApplicationId} of the application to be
* aborted.</p> * aborted.</p>
* *
* @see ClientRMProtocol#finishApplication(FinishApplicationRequest) * @see ClientRMProtocol#forceKillApplication(KillApplicationRequest)
*/ */
@Public @Public
@Stable @Stable
public interface FinishApplicationRequest { public interface KillApplicationRequest {
/** /**
* Get the <code>ApplicationId</code> of the application to be aborted. * Get the <code>ApplicationId</code> of the application to be aborted.
* @return <code>ApplicationId</code> of the application to be aborted * @return <code>ApplicationId</code> of the application to be aborted

View File

@ -28,10 +28,10 @@ import org.apache.hadoop.yarn.api.ClientRMProtocol;
* *
* <p>Currently it's empty.</p> * <p>Currently it's empty.</p>
* *
* @see ClientRMProtocol#finishApplication(FinishApplicationRequest) * @see ClientRMProtocol#forceKillApplication(KillApplicationRequest)
*/ */
@Public @Public
@Stable @Stable
public interface FinishApplicationResponse { public interface KillApplicationResponse {
} }

View File

@ -19,34 +19,34 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProtoOrBuilder;
public class FinishApplicationRequestPBImpl extends ProtoBase<FinishApplicationRequestProto> implements FinishApplicationRequest { public class KillApplicationRequestPBImpl extends ProtoBase<KillApplicationRequestProto> implements KillApplicationRequest {
FinishApplicationRequestProto proto = FinishApplicationRequestProto.getDefaultInstance(); KillApplicationRequestProto proto = KillApplicationRequestProto.getDefaultInstance();
FinishApplicationRequestProto.Builder builder = null; KillApplicationRequestProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private ApplicationId applicationId = null; private ApplicationId applicationId = null;
public FinishApplicationRequestPBImpl() { public KillApplicationRequestPBImpl() {
builder = FinishApplicationRequestProto.newBuilder(); builder = KillApplicationRequestProto.newBuilder();
} }
public FinishApplicationRequestPBImpl(FinishApplicationRequestProto proto) { public KillApplicationRequestPBImpl(KillApplicationRequestProto proto) {
this.proto = proto; this.proto = proto;
viaProto = true; viaProto = true;
} }
public FinishApplicationRequestProto getProto() { public KillApplicationRequestProto getProto() {
mergeLocalToProto(); mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
@ -69,7 +69,7 @@ public class FinishApplicationRequestPBImpl extends ProtoBase<FinishApplicationR
private void maybeInitBuilder() { private void maybeInitBuilder() {
if (viaProto || builder == null) { if (viaProto || builder == null) {
builder = FinishApplicationRequestProto.newBuilder(proto); builder = KillApplicationRequestProto.newBuilder(proto);
} }
viaProto = false; viaProto = false;
} }
@ -77,7 +77,7 @@ public class FinishApplicationRequestPBImpl extends ProtoBase<FinishApplicationR
@Override @Override
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
FinishApplicationRequestProtoOrBuilder p = viaProto ? proto : builder; KillApplicationRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationId != null) { if (this.applicationId != null) {
return this.applicationId; return this.applicationId;
} }

View File

@ -19,27 +19,27 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
public class FinishApplicationResponsePBImpl extends ProtoBase<FinishApplicationResponseProto> implements FinishApplicationResponse { public class KillApplicationResponsePBImpl extends ProtoBase<KillApplicationResponseProto> implements KillApplicationResponse {
FinishApplicationResponseProto proto = FinishApplicationResponseProto.getDefaultInstance(); KillApplicationResponseProto proto = KillApplicationResponseProto.getDefaultInstance();
FinishApplicationResponseProto.Builder builder = null; KillApplicationResponseProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
public FinishApplicationResponsePBImpl() { public KillApplicationResponsePBImpl() {
builder = FinishApplicationResponseProto.newBuilder(); builder = KillApplicationResponseProto.newBuilder();
} }
public FinishApplicationResponsePBImpl(FinishApplicationResponseProto proto) { public KillApplicationResponsePBImpl(KillApplicationResponseProto proto) {
this.proto = proto; this.proto = proto;
viaProto = true; viaProto = true;
} }
public FinishApplicationResponseProto getProto() { public KillApplicationResponseProto getProto() {
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
return proto; return proto;
@ -47,7 +47,7 @@ public class FinishApplicationResponsePBImpl extends ProtoBase<FinishApplication
private void maybeInitBuilder() { private void maybeInitBuilder() {
if (viaProto || builder == null) { if (viaProto || builder == null) {
builder = FinishApplicationResponseProto.newBuilder(proto); builder = KillApplicationResponseProto.newBuilder(proto);
} }
viaProto = false; viaProto = false;
} }

View File

@ -27,7 +27,7 @@ service ClientRMProtocolService {
rpc getNewApplicationId (GetNewApplicationIdRequestProto) returns (GetNewApplicationIdResponseProto); rpc getNewApplicationId (GetNewApplicationIdRequestProto) returns (GetNewApplicationIdResponseProto);
rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto); rpc getApplicationReport (GetApplicationReportRequestProto) returns (GetApplicationReportResponseProto);
rpc submitApplication (SubmitApplicationRequestProto) returns (SubmitApplicationResponseProto); rpc submitApplication (SubmitApplicationRequestProto) returns (SubmitApplicationResponseProto);
rpc finishApplication (FinishApplicationRequestProto) returns (FinishApplicationResponseProto); rpc forceKillApplication (KillApplicationRequestProto) returns (KillApplicationResponseProto);
rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto); rpc getClusterMetrics (GetClusterMetricsRequestProto) returns (GetClusterMetricsResponseProto);
rpc getAllApplications (GetAllApplicationsRequestProto) returns (GetAllApplicationsResponseProto); rpc getAllApplications (GetAllApplicationsRequestProto) returns (GetAllApplicationsResponseProto);
rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto); rpc getClusterNodes (GetClusterNodesRequestProto) returns (GetClusterNodesResponseProto);

View File

@ -88,11 +88,11 @@ message SubmitApplicationRequestProto {
message SubmitApplicationResponseProto { message SubmitApplicationResponseProto {
} }
message FinishApplicationRequestProto { message KillApplicationRequestProto {
optional ApplicationIdProto application_id = 1; optional ApplicationIdProto application_id = 1;
} }
message FinishApplicationResponseProto { message KillApplicationResponseProto {
} }
message GetClusterMetricsRequestProto { message GetClusterMetricsRequestProto {

View File

@ -25,8 +25,6 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -41,10 +39,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
@ -59,21 +57,22 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoRequestPBI
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine; import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine;
import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService; import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterMetricsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetClusterNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationIdRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationIdRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -88,11 +87,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol {
} }
@Override @Override
public FinishApplicationResponse finishApplication( public KillApplicationResponse forceKillApplication(
FinishApplicationRequest request) throws YarnRemoteException { KillApplicationRequest request) throws YarnRemoteException {
FinishApplicationRequestProto requestProto = ((FinishApplicationRequestPBImpl)request).getProto(); KillApplicationRequestProto requestProto = ((KillApplicationRequestPBImpl)request).getProto();
try { try {
return new FinishApplicationResponsePBImpl(proxy.finishApplication(null, requestProto)); return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, requestProto));
} catch (ServiceException e) { } catch (ServiceException e) {
if (e.getCause() instanceof YarnRemoteException) { if (e.getCause() instanceof YarnRemoteException) {
throw (YarnRemoteException)e.getCause(); throw (YarnRemoteException)e.getCause();

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.api.impl.pb.service; package org.apache.hadoop.yarn.api.impl.pb.service;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
@ -27,9 +26,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetAllApplicationsResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationReportRequestPBImpl;
@ -44,12 +42,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoRequestPBI
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueInfoResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetQueueUserAclsInfoResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.KillApplicationResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService.BlockingInterface; import org.apache.hadoop.yarn.proto.ClientRMProtocol.ClientRMProtocolService.BlockingInterface;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetAllApplicationsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationReportRequestProto;
@ -64,6 +62,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueInfoResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetQueueUserAclsInfoResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
@ -79,12 +79,12 @@ public class ClientRMProtocolPBServiceImpl implements BlockingInterface {
} }
@Override @Override
public FinishApplicationResponseProto finishApplication(RpcController arg0, public KillApplicationResponseProto forceKillApplication(RpcController arg0,
FinishApplicationRequestProto proto) throws ServiceException { KillApplicationRequestProto proto) throws ServiceException {
FinishApplicationRequestPBImpl request = new FinishApplicationRequestPBImpl(proto); KillApplicationRequestPBImpl request = new KillApplicationRequestPBImpl(proto);
try { try {
FinishApplicationResponse response = real.finishApplication(request); KillApplicationResponse response = real.forceKillApplication(request);
return ((FinishApplicationResponsePBImpl)response).getProto(); return ((KillApplicationResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }

View File

@ -90,7 +90,7 @@ public class TestRPC {
.newRecord(GetNewApplicationIdRequest.class)); .newRecord(GetNewApplicationIdRequest.class));
Assert.fail("Excepted RPC call to fail with unknown method."); Assert.fail("Excepted RPC call to fail with unknown method.");
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
Assert.assertEquals("Unknown method getNewApplicationId called on " Assert.assertEquals("Unknown method getNewApplicationId called on interface "
+ "org.apache.hadoop.yarn.proto.ClientRMProtocol" + "org.apache.hadoop.yarn.proto.ClientRMProtocol"
+ "$ClientRMProtocolService$BlockingInterface protocol.", e + "$ClientRMProtocolService$BlockingInterface protocol.", e
.getMessage()); .getMessage());

View File

@ -36,8 +36,8 @@ import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@ -228,8 +228,8 @@ public class ClientRMService extends AbstractService implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public FinishApplicationResponse finishApplication( public KillApplicationResponse forceKillApplication(
FinishApplicationRequest request) throws YarnRemoteException { KillApplicationRequest request) throws YarnRemoteException {
ApplicationId applicationId = request.getApplicationId(); ApplicationId applicationId = request.getApplicationId();
@ -262,8 +262,8 @@ public class ClientRMService extends AbstractService implements
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId); AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId);
FinishApplicationResponse response = recordFactory KillApplicationResponse response = recordFactory
.newRecordInstance(FinishApplicationResponse.class); .newRecordInstance(KillApplicationResponse.class);
return response; return response;
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@ -86,7 +87,8 @@ public class RMAppImpl implements RMApp {
private long startTime; private long startTime;
private long finishTime; private long finishTime;
private RMAppAttempt currentAttempt; private RMAppAttempt currentAttempt;
@SuppressWarnings("rawtypes")
private EventHandler handler;
private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final StateMachineFactory<RMAppImpl, private static final StateMachineFactory<RMAppImpl,
@ -99,9 +101,6 @@ public class RMAppImpl implements RMApp {
RMAppEvent>(RMAppState.NEW) RMAppEvent>(RMAppState.NEW)
// TODO - ATTEMPT_KILLED not sent right now but should handle if
// attempt starts sending
// Transitions from NEW state // Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.SUBMITTED, .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
RMAppEventType.START, new StartAppAttemptTransition()) RMAppEventType.START, new StartAppAttemptTransition())
@ -116,7 +115,7 @@ public class RMAppImpl implements RMApp {
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED) RMAppEventType.APP_ACCEPTED)
.addTransition(RMAppState.SUBMITTED, RMAppState.KILLED, .addTransition(RMAppState.SUBMITTED, RMAppState.KILLED,
RMAppEventType.KILL, new AppKilledTransition()) RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from ACCEPTED state // Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
@ -126,7 +125,7 @@ public class RMAppImpl implements RMApp {
RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED)) new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLED, .addTransition(RMAppState.ACCEPTED, RMAppState.KILLED,
RMAppEventType.KILL, new AppKilledTransition()) RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from RUNNING state // Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED, .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
@ -136,7 +135,7 @@ public class RMAppImpl implements RMApp {
RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED)) new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.RUNNING, RMAppState.KILLED, .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
RMAppEventType.KILL, new AppKilledTransition()) RMAppEventType.KILL, new KillAppAndAttemptTransition())
// Transitions from FINISHED state // Transitions from FINISHED state
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED, .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
@ -168,6 +167,7 @@ public class RMAppImpl implements RMApp {
this.name = name; this.name = name;
this.rmContext = rmContext; this.rmContext = rmContext;
this.dispatcher = rmContext.getDispatcher(); this.dispatcher = rmContext.getDispatcher();
this.handler = dispatcher.getEventHandler();
this.conf = config; this.conf = config;
this.user = user; this.user = user;
this.queue = queue; this.queue = queue;
@ -403,7 +403,7 @@ public class RMAppImpl implements RMApp {
submissionContext); submissionContext);
attempts.put(appAttemptId, attempt); attempts.put(appAttemptId, attempt);
currentAttempt = attempt; currentAttempt = attempt;
dispatcher.getEventHandler().handle( handler.handle(
new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START)); new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
} }
@ -420,13 +420,23 @@ public class RMAppImpl implements RMApp {
}; };
} }
private static final class AppKilledTransition extends FinalTransition { private static class AppKilledTransition extends FinalTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append("Application killed by user."); app.diagnostics.append("Application killed by user.");
super.transition(app, event); super.transition(app, event);
}; };
} }
private static class KillAppAndAttemptTransition extends AppKilledTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL));
super.transition(app, event);
}
}
private static final class AppRejectedTransition extends private static final class AppRejectedTransition extends
FinalTransition{ FinalTransition{
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
@ -450,11 +460,11 @@ public class RMAppImpl implements RMApp {
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app); Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
for (NodeId nodeId : nodes) { for (NodeId nodeId : nodes) {
app.dispatcher.getEventHandler().handle( app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId)); new RMNodeCleanAppEvent(nodeId, app.applicationId));
} }
app.finishTime = System.currentTimeMillis(); app.finishTime = System.currentTimeMillis();
app.dispatcher.getEventHandler().handle( app.handler.handle(
new RMAppManagerEvent(app.applicationId, new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED)); RMAppManagerEventType.APP_COMPLETED));
}; };

View File

@ -22,7 +22,7 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@ -109,9 +109,9 @@ public class MockRM extends ResourceManager {
public void killApp(ApplicationId appId) throws Exception { public void killApp(ApplicationId appId) throws Exception {
ClientRMProtocol client = getClientRMService(); ClientRMProtocol client = getClientRMService();
FinishApplicationRequest req = Records.newRecord(FinishApplicationRequest.class); KillApplicationRequest req = Records.newRecord(KillApplicationRequest.class);
req.setApplicationId(appId); req.setApplicationId(appId);
client.finishApplication(req); client.forceKillApplication(req);
} }
//from AMLauncher //from AMLauncher

View File

@ -1,52 +1,57 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@Private
public class InlineDispatcher extends AsyncDispatcher { public class InlineDispatcher extends AsyncDispatcher {
private class InlineEventHandler implements EventHandler { private static final Log LOG = LogFactory.getLog(InlineDispatcher.class);
private final InlineDispatcher dispatcher;
public InlineEventHandler(InlineDispatcher dispatcher) { private class TestEventHandler implements EventHandler {
this.dispatcher = dispatcher;
}
@Override @Override
public void handle(Event event) { public void handle(Event event) {
this.dispatcher.dispatch(event); dispatch(event);
} }
} }
public void dispatch(Event event) { @Override
super.dispatch(event); protected void dispatch(Event event) {
LOG.info("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
Class<? extends Enum> type = event.getType().getDeclaringClass();
if (eventDispatchers.get(type) != null) {
eventDispatchers.get(type).handle(event);
}
} }
@Override @Override
public EventHandler getEventHandler() { public EventHandler getEventHandler() {
return new InlineEventHandler(this); return new TestEventHandler();
} }
static class EmptyEventHandler implements EventHandler<Event> { static class EmptyEventHandler implements EventHandler<Event> {
@Override @Override
public void handle(Event event) { public void handle(Event event) {
; // ignore //do nothing
} }
} }
} }

View File

@ -1,26 +1,27 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.rmapp; package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
@ -37,12 +38,14 @@ import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -52,27 +55,40 @@ import org.junit.Test;
public class TestRMAppTransitions { public class TestRMAppTransitions {
private static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class); static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
private RMContext rmContext; private RMContext rmContext;
private static int maxRetries = 4; private static int maxRetries = 4;
private static int appId = 1; private static int appId = 1;
private AsyncDispatcher rmDispatcher;
// ignore all the RM application attempt events // ignore all the RM application attempt events
private static final class TestApplicationAttemptEventDispatcher implements private static final class TestApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> { EventHandler<RMAppAttemptEvent> {
public TestApplicationAttemptEventDispatcher() { private final RMContext rmContext;
public TestApplicationAttemptEventDispatcher(RMContext rmContext) {
this.rmContext = rmContext;
} }
@Override @Override
public void handle(RMAppAttemptEvent event) { public void handle(RMAppAttemptEvent event) {
ApplicationId appId = event.getApplicationAttemptId().getApplicationId();
RMApp rmApp = this.rmContext.getRMApps().get(appId);
if (rmApp != null) {
try {
rmApp.getRMAppAttempt(event.getApplicationAttemptId()).handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " for application " + appId, t);
}
}
} }
} }
// handle all the RM application events - same as in ResourceManager.java // handle all the RM application events - same as in ResourceManager.java
private static final class TestApplicationEventDispatcher implements private static final class TestApplicationEventDispatcher implements
EventHandler<RMAppEvent> { EventHandler<RMAppEvent> {
private final RMContext rmContext; private final RMContext rmContext;
public TestApplicationEventDispatcher(RMContext rmContext) { public TestApplicationEventDispatcher(RMContext rmContext) {
@ -97,18 +113,22 @@ public class TestRMAppTransitions {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
AsyncDispatcher rmDispatcher = new AsyncDispatcher(); AsyncDispatcher rmDispatcher = new AsyncDispatcher();
Configuration conf = new Configuration();
rmDispatcher = new InlineDispatcher();
ContainerAllocationExpirer containerAllocationExpirer = ContainerAllocationExpirer containerAllocationExpirer =
mock(ContainerAllocationExpirer.class); mock(ContainerAllocationExpirer.class);
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class); AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher, this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor); containerAllocationExpirer, amLivelinessMonitor);
rmDispatcher.register(RMAppAttemptEventType.class, rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher()); new TestApplicationAttemptEventDispatcher(this.rmContext));
rmDispatcher.register(RMAppEventType.class, rmDispatcher.register(RMAppEventType.class,
new TestApplicationEventDispatcher(rmContext)); new TestApplicationEventDispatcher(rmContext));
rmDispatcher.init(conf);
rmDispatcher.start();
} }
protected RMApp createNewTestApp() { protected RMApp createNewTestApp() {
@ -128,10 +148,10 @@ public class TestRMAppTransitions {
new ApplicationTokenSecretManager(), scheduler); new ApplicationTokenSecretManager(), scheduler);
RMApp application = new RMAppImpl(applicationId, rmContext, RMApp application = new RMAppImpl(applicationId, rmContext,
conf, name, user, conf, name, user,
queue, submissionContext, clientTokenStr, queue, submissionContext, clientTokenStr,
appStore, scheduler, appStore, scheduler,
masterService); masterService);
testAppStartState(applicationId, user, name, queue, application); testAppStartState(applicationId, user, name, queue, application);
return application; return application;
@ -193,6 +213,14 @@ public class TestRMAppTransitions {
"Application killed by user.", diag.toString()); "Application killed by user.", diag.toString());
} }
private static void assertAppAndAttemptKilled(RMApp application) {
assertKilled(application);
/* also check if the attempt is killed */
Assert.assertEquals( RMAppAttemptState.KILLED,
application.getCurrentAppAttempt().getAppAttemptState()
);
}
private static void assertFailed(RMApp application, String regex) { private static void assertFailed(RMApp application, String regex) {
assertTimesAtFinish(application); assertTimesAtFinish(application);
assertAppState(RMAppState.FAILED, application); assertAppState(RMAppState.FAILED, application);
@ -298,10 +326,10 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppAccepted(); RMApp application = testCreateAppAccepted();
// SUBMITTED => KILLED event RMAppEventType.KILL // SUBMITTED => KILLED event RMAppEventType.KILL
RMAppEvent event = RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL); this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application);
application.handle(event); application.handle(event);
assertKilled(application); assertAppAndAttemptKilled(application);
} }
@Test @Test