MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino.

(cherry picked from commit aa92dd45f2)
(cherry picked from commit 3f282762d1)
(cherry picked from commit 30a370e705)
This commit is contained in:
subru 2014-09-24 18:01:38 -07:00 committed by Chris Douglas
parent d244b2ae3a
commit 256a951049
7 changed files with 104 additions and 0 deletions

View File

@ -23,3 +23,6 @@ subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru)
YARN-2080. Integrating reservation system with ResourceManager and
client-RM protocol. (Subru Krishnan and Carlo Curino via subru)
MAPREDUCE-6103. Adding reservation APIs to MR resource manager
delegate. (Subru Krishnan and Carlo Curino via subru)

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
/**
* The job submitter's view of the Job.
@ -112,6 +113,7 @@ public class Job extends JobContextImpl implements JobContext {
private JobStatus status;
private long statustime;
private Cluster cluster;
private ReservationId reservationId;
/**
* @deprecated Use {@link #getInstance()}
@ -1523,5 +1525,24 @@ public class Job extends JobContextImpl implements JobContext {
updateStatus();
return status.isUber();
}
/**
* Get the reservation to which the job is submitted to, if any
*
* @return the reservationId the identifier of the job's reservation, null if
* the job does not have any reservation associated with it
*/
public ReservationId getReservationId() {
return reservationId;
}
/**
* Set the reservation to which the job is submitted to
*
* @param reservationId the reservationId to set
*/
public void setReservationId(ReservationId reservationId) {
this.reservationId = reservationId;
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.QueueACL;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
@ -60,6 +61,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@ -427,6 +429,12 @@ class JobSubmitter {
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);

View File

@ -63,6 +63,8 @@ public interface MRJobConfig {
public static final String QUEUE_NAME = "mapreduce.job.queuename";
public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
public static final String JOB_TAGS = "mapreduce.job.tags";
public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";

View File

@ -43,6 +43,12 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -406,4 +412,22 @@ public class ResourceMgrDelegate extends YarnClient {
throws YarnException, IOException {
client.moveApplicationAcrossQueues(appId, queue);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
return client.submitReservation(request);
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
return client.updateReservation(request);
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
return client.deleteReservation(request);
}
}

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -489,6 +490,26 @@ public class YARNRunner implements ClientProtocol {
appContext.setQueue( // Queue name
jobConf.get(JobContext.QUEUE_NAME,
YarnConfiguration.DEFAULT_QUEUE_NAME));
// add reservationID if present
ReservationId reservationID = null;
try {
reservationID =
ReservationId.parseReservationId(jobConf
.get(JobContext.RESERVATION_ID));
} catch (NumberFormatException e) {
// throw exception as reservationid as is invalid
String errMsg =
"Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
+ " specified for the app: " + applicationId;
LOG.warn(errMsg);
throw new IOException(errMsg);
}
if (reservationID != null) {
appContext.setReservationID(reservationID);
LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
+ " to queue:" + appContext.getQueue() + " with reservationId:"
+ appContext.getReservationID());
}
appContext.setApplicationName( // Job name
jobConf.get(JobContext.JOB_NAME,
YarnConfiguration.DEFAULT_APPLICATION_NAME));
@ -503,6 +524,7 @@ public class YARNRunner implements ClientProtocol {
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
}
return appContext;
}

View File

@ -102,6 +102,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -397,6 +403,24 @@ public class TestClientRedirect {
throws YarnException, IOException {
return null;
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
return null;
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
return null;
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
return null;
}
}
class HistoryService extends AMService implements HSClientProtocol {