MAPREDUCE-5732. Report proper queue when job has been automatically placed (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-30 17:57:08 +00:00
parent 3a23c563fa
commit 28b178f55e
18 changed files with 203 additions and 12 deletions

View File

@ -61,6 +61,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
YARN resource model (Sandy Ryza)
MAPREDUCE-5732. Report proper queue when job has been automatically placed
(Sandy Ryza)
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

View File

@ -525,6 +525,12 @@ public class JobHistoryEventHandler extends AbstractService
JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setJobStartTime(jie.getLaunchTime());
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_QUEUE_CHANGED) {
JobQueueChangeEvent jQueueEvent =
(JobQueueChangeEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setQueueName(jQueueEvent.getJobQueueName());
}
// If this is JobFinishedEvent, close the writer and setup the job-index
if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
/**
* Main interface to interact with the job. Provides only getters.
* Main interface to interact with the job.
*/
public interface Job {
@ -98,4 +98,6 @@ public interface Job {
List<AMInfo> getAMInfos();
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation);
public void setQueueName(String queueName);
}

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobQueueChangeEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
@ -181,7 +182,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final EventHandler eventHandler;
private final MRAppMetrics metrics;
private final String userName;
private final String queueName;
private String queueName;
private final long appSubmitTime;
private final AppContext appContext;
@ -1123,6 +1124,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return queueName;
}
@Override
public void setQueueName(String queueName) {
this.queueName = queueName;
JobQueueChangeEvent jqce = new JobQueueChangeEvent(oldJobId, queueName);
eventHandler.handle(new JobHistoryEvent(jobId, jqce));
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()

View File

@ -109,11 +109,11 @@ public abstract class RMCommunicator extends AbstractService
@Override
protected void serviceStart() throws Exception {
scheduler= createSchedulerProxy();
register();
startAllocatorThread();
JobID id = TypeConverter.fromYarn(this.applicationId);
JobId jobId = TypeConverter.toYarn(id);
job = context.getJob(jobId);
register();
startAllocatorThread();
super.serviceStart();
}
@ -161,6 +161,9 @@ public abstract class RMCommunicator extends AbstractService
}
this.applicationACLs = response.getApplicationACLs();
LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
String queue = response.getQueue();
LOG.info("queue: " + queue);
job.setQueueName(queue);
} catch (Exception are) {
LOG.error("Exception while registering", are);
throw new YarnRuntimeException(are);

View File

@ -81,6 +81,15 @@ public class TestEvents {
assertEquals(test.getPriority(), JobPriority.LOW);
}
@Test(timeout = 10000)
public void testJobQueueChange() throws Exception {
org.apache.hadoop.mapreduce.JobID jid = new JobID("001", 1);
JobQueueChangeEvent test = new JobQueueChangeEvent(jid,
"newqueue");
assertEquals(test.getJobId().toString(), jid.toString());
assertEquals(test.getJobQueueName(), "newqueue");
}
/**
* simple test TaskUpdatedEvent and TaskUpdated

View File

@ -116,6 +116,9 @@ public class MRApp extends MRAppMaster {
private File testWorkDir;
private Path testAbsPath;
private ClusterInfo clusterInfo;
// Queue to pretend the RM assigned us
private String assignedQueue;
public static String NM_HOST = "localhost";
public static int NM_PORT = 1234;
@ -132,7 +135,7 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, Clock clock) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock);
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock, null);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -145,6 +148,12 @@ public class MRApp extends MRAppMaster {
boolean cleanOnStart) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, String assignedQueue) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1,
new SystemClock(), assignedQueue);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, boolean unregistered) {
@ -177,7 +186,7 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock());
new SystemClock(), null);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -190,33 +199,34 @@ public class MRApp extends MRAppMaster {
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock, unregistered);
cleanOnStart, startCount, clock, unregistered, null);
}
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock) {
boolean cleanOnStart, int startCount, Clock clock, String assignedQueue) {
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), maps, reduces, autoComplete, testName,
cleanOnStart, startCount, clock, true);
cleanOnStart, startCount, clock, true, assignedQueue);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean unregistered) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), unregistered);
cleanOnStart, startCount, new SystemClock(), unregistered, null);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), true);
cleanOnStart, startCount, new SystemClock(), true, null);
}
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered) {
boolean cleanOnStart, int startCount, Clock clock, boolean unregistered,
String assignedQueue) {
super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
this.testWorkDir = new File("target", testName);
@ -238,6 +248,7 @@ public class MRApp extends MRAppMaster {
// If safeToReportTerminationToUser is set to true, we can verify whether
// the job can reaches the final state when MRAppMaster shuts down.
this.successfullyUnregistered.set(unregistered);
this.assignedQueue = assignedQueue;
}
@Override
@ -284,6 +295,9 @@ public class MRApp extends MRAppMaster {
start();
DefaultMetricsSystem.shutdown();
Job job = getContext().getAllJobs().values().iterator().next();
if (assignedQueue != null) {
job.setQueueName(assignedQueue);
}
// Write job.xml
String jobFile = MRApps.getJobFile(conf, user,

View File

@ -39,6 +39,7 @@ public class MockAppContext implements AppContext {
final Map<JobId, Job> jobs;
final long startTime = System.currentTimeMillis();
Set<String> blacklistedNodes;
String queue;
public MockAppContext(int appid) {
appID = MockJobs.newAppID(appid);

View File

@ -629,6 +629,11 @@ public class MockJobs extends MockApps {
jobConf.addResource(fc.open(configFile), configFile.toString());
return jobConf;
}
@Override
public void setQueueName(String queueName) {
// do nothing
}
};
}

View File

@ -505,6 +505,11 @@ public class TestRuntimeEstimators {
public Configuration loadConfFile() {
throw new UnsupportedOperationException();
}
@Override
public void setQueueName(String queueName) {
// do nothing
}
}
/*

View File

@ -122,6 +122,13 @@
]
},
{"type": "record", "name": "JobQueueChange",
"fields": [
{"name": "jobid", "type": "string"},
{"name": "jobQueueName", "type": "string"}
]
},
{"type": "record", "name": "JobUnsuccessfulCompletion",
"fields": [
{"name": "jobid", "type": "string"},
@ -267,6 +274,7 @@
"JOB_FINISHED",
"JOB_PRIORITY_CHANGED",
"JOB_STATUS_CHANGED",
"JOB_QUEUE_CHANGED",
"JOB_FAILED",
"JOB_KILLED",
"JOB_ERROR",
@ -306,6 +314,7 @@
"JobInited",
"AMStarted",
"JobPriorityChange",
"JobQueueChange",
"JobStatusChanged",
"JobSubmitted",
"JobUnsuccessfulCompletion",

View File

@ -98,6 +98,8 @@ public class EventReader implements Closeable {
result = new JobFinishedEvent(); break;
case JOB_PRIORITY_CHANGED:
result = new JobPriorityChangeEvent(); break;
case JOB_QUEUE_CHANGED:
result = new JobQueueChangeEvent(); break;
case JOB_STATUS_CHANGED:
result = new JobStatusChangedEvent(); break;
case JOB_FAILED:

View File

@ -183,6 +183,9 @@ public class JobHistoryParser implements HistoryEventHandler {
case JOB_PRIORITY_CHANGED:
handleJobPriorityChangeEvent((JobPriorityChangeEvent) event);
break;
case JOB_QUEUE_CHANGED:
handleJobQueueChangeEvent((JobQueueChangeEvent) event);
break;
case JOB_FAILED:
case JOB_KILLED:
case JOB_ERROR:
@ -385,6 +388,10 @@ public class JobHistoryParser implements HistoryEventHandler {
private void handleJobPriorityChangeEvent(JobPriorityChangeEvent event) {
info.priority = event.getPriority();
}
private void handleJobQueueChangeEvent(JobQueueChangeEvent event) {
info.jobQueueName = event.getJobQueueName();
}
private void handleJobInitedEvent(JobInitedEvent event) {
info.launchTime = event.getLaunchTime();

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.mapreduce.JobID;
@SuppressWarnings("deprecation")
public class JobQueueChangeEvent implements HistoryEvent {
private JobQueueChange datum = new JobQueueChange();
public JobQueueChangeEvent(JobID id, String queueName) {
datum.jobid = new Utf8(id.toString());
datum.jobQueueName = new Utf8(queueName);
}
JobQueueChangeEvent() { }
@Override
public EventType getEventType() {
return EventType.JOB_QUEUE_CHANGED;
}
@Override
public Object getDatum() {
return datum;
}
@Override
public void setDatum(Object datum) {
this.datum = (JobQueueChange) datum;
}
/** Get the Job ID */
public JobID getJobId() {
return JobID.forName(datum.jobid.toString());
}
/** Get the new Job queue name */
public String getJobQueueName() {
if (datum.jobQueueName != null) {
return datum.jobQueueName.toString();
}
return null;
}
}

View File

@ -453,4 +453,9 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
}
return amInfos;
}
@Override
public void setQueueName(String queueName) {
throw new UnsupportedOperationException("Can't set job's queue name in history");
}
}

View File

@ -190,5 +190,10 @@ public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
public List<AMInfo> getAMInfos() {
return null;
}
@Override
public void setQueueName(String queueName) {
throw new UnsupportedOperationException("Can't set job's queue name in history");
}
}

View File

@ -155,6 +155,41 @@ public class TestJobHistoryEvents {
Assert.assertEquals("JobHistoryEventHandler",
services[services.length - 1].getName());
}
@Test
public void testAssignedQueue() throws Exception {
Configuration conf = new Configuration();
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
true, "assignedQueue");
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
//make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
/*
* Use HistoryContext to read logged events and verify the number of
* completed maps
*/
HistoryContext context = new JobHistory();
// test start and stop states
((JobHistory)context).init(conf);
((JobHistory)context).start();
Assert.assertTrue( context.getStartTime()>0);
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED);
// get job before stopping JobHistory
Job parsedJob = context.getJob(jobId);
// stop JobHistory
((JobHistory)context).stop();
Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED);
Assert.assertEquals("QueueName not correct", "assignedQueue",
parsedJob.getQueueName());
}
private void verifyTask(Task task) {
Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED,
@ -184,6 +219,11 @@ public class TestJobHistoryEvents {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, String assignedQueue) {
super(maps, reduces, autoComplete, testName, cleanOnStart, assignedQueue);
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {

View File

@ -415,5 +415,9 @@ public class TestHsWebServicesAcls {
return aclsMgr.checkAccess(callerUGI, jobOperation,
this.getUserName(), jobAcls.get(jobOperation));
}
@Override
public void setQueueName(String queueName) {
}
}
}