svn merge -c 1361389 FIXES: MAPREDUCE-4419. ./mapred queue -info <queuename> -showJobs displays all the jobs irrespective of <queuename> (Devaraj K via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1361391 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e4f54acd26
commit
3260960be3
|
@ -558,6 +558,9 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-3940. ContainerTokens should have an expiry interval. (Siddharth
|
||||
Seth and Vinod Kumar Vavilapalli via vinodkv)
|
||||
|
||||
MAPREDUCE-4419. ./mapred queue -info <queuename> -showJobs displays all
|
||||
the jobs irrespective of <queuename> (Devaraj K via bobby)
|
||||
|
||||
Release 0.23.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.mapreduce.JobStatus;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -184,7 +185,7 @@ class JobQueueClient extends Configured implements Tool {
|
|||
printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
|
||||
if (showJobs && (jobQueueInfo.getChildren() == null ||
|
||||
jobQueueInfo.getChildren().size() == 0)) {
|
||||
JobStatus[] jobs = jc.getJobsFromQueue(queue);
|
||||
JobStatus[] jobs = jobQueueInfo.getJobStatuses();
|
||||
if (jobs == null)
|
||||
jobs = new JobStatus[0];
|
||||
jc.displayJobList(jobs);
|
||||
|
|
|
@ -238,7 +238,7 @@ public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {
|
|||
stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
|
||||
stat.getCleanupProgress(), stat.getState().getValue(),
|
||||
JobPriority.valueOf(stat.getPriority().name()),
|
||||
stat.getUsername(), stat.getJobName(), stat.getJobFile(),
|
||||
stat.getUsername(), stat.getJobName(), stat.getQueue(), stat.getJobFile(),
|
||||
stat.getTrackingUrl(), stat.isUber());
|
||||
old.setStartTime(stat.getStartTime());
|
||||
old.setFinishTime(stat.getFinishTime());
|
||||
|
|
|
@ -389,7 +389,9 @@ public class ClientRMService extends AbstractService implements
|
|||
appReports = new ArrayList<ApplicationReport>(
|
||||
apps.size());
|
||||
for (RMApp app : apps) {
|
||||
appReports.add(app.createAndGetApplicationReport(true));
|
||||
if (app.getQueue().equals(queueInfo.getQueueName())) {
|
||||
appReports.add(app.createAndGetApplicationReport(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
queueInfo.setApplications(appReports);
|
||||
|
|
|
@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -34,13 +37,21 @@ import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -49,6 +60,9 @@ public class TestClientRMService {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(TestClientRMService.class);
|
||||
|
||||
private RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(null);
|
||||
|
||||
@Test
|
||||
public void testGetClusterNodes() throws Exception {
|
||||
MockRM rm = new MockRM() {
|
||||
|
@ -109,4 +123,66 @@ public class TestClientRMService {
|
|||
Assert.assertNull("It should return null as application report for absent application.",
|
||||
applicationReport.getApplicationReport());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetQueueInfo() throws Exception {
|
||||
YarnScheduler yarnScheduler = mock(YarnScheduler.class);
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
mockRMContext(yarnScheduler, rmContext);
|
||||
ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler,
|
||||
null, null, null);
|
||||
GetQueueInfoRequest request = recordFactory
|
||||
.newRecordInstance(GetQueueInfoRequest.class);
|
||||
request.setQueueName("testqueue");
|
||||
request.setIncludeApplications(true);
|
||||
GetQueueInfoResponse queueInfo = rmService.getQueueInfo(request);
|
||||
List<ApplicationReport> applications = queueInfo.getQueueInfo()
|
||||
.getApplications();
|
||||
Assert.assertEquals(2, applications.size());
|
||||
}
|
||||
|
||||
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
|
||||
throws IOException {
|
||||
Dispatcher dispatcher = mock(Dispatcher.class);
|
||||
when(rmContext.getDispatcher()).thenReturn(dispatcher);
|
||||
QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
||||
queInfo.setQueueName("testqueue");
|
||||
when(yarnScheduler.getQueueInfo(anyString(), anyBoolean(), anyBoolean()))
|
||||
.thenReturn(queInfo);
|
||||
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
|
||||
yarnScheduler);
|
||||
when(rmContext.getRMApps()).thenReturn(apps);
|
||||
}
|
||||
|
||||
private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
|
||||
RMContext rmContext, YarnScheduler yarnScheduler) {
|
||||
ConcurrentHashMap<ApplicationId, RMApp> apps =
|
||||
new ConcurrentHashMap<ApplicationId, RMApp>();
|
||||
ApplicationId applicationId1 = getApplicationId(1);
|
||||
ApplicationId applicationId2 = getApplicationId(2);
|
||||
ApplicationId applicationId3 = getApplicationId(3);
|
||||
YarnConfiguration config = new YarnConfiguration();
|
||||
apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1,
|
||||
config, "testqueue"));
|
||||
apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2,
|
||||
config, "a"));
|
||||
apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3,
|
||||
config, "testqueue"));
|
||||
return apps;
|
||||
}
|
||||
|
||||
private ApplicationId getApplicationId(int id) {
|
||||
ApplicationId applicationId = recordFactory
|
||||
.newRecordInstance(ApplicationId.class);
|
||||
applicationId.setClusterTimestamp(123456);
|
||||
applicationId.setId(id);
|
||||
return applicationId;
|
||||
}
|
||||
|
||||
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
|
||||
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
|
||||
return new RMAppImpl(applicationId3, rmContext, config, null, null,
|
||||
queueName, null, null, null, yarnScheduler, null, System
|
||||
.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue