MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino.

(cherry picked from commit aa92dd45f2)
(cherry picked from commit 3f282762d1)
This commit is contained in:
subru 2014-09-24 18:01:38 -07:00 committed by Chris Douglas
parent cbfbdf60d6
commit 30a370e705
7 changed files with 104 additions and 0 deletions

View File

@ -23,3 +23,6 @@ subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru)
YARN-2080. Integrating reservation system with ResourceManager and
client-RM protocol. (Subru Krishnan and Carlo Curino via subru)
MAPREDUCE-6103. Adding reservation APIs to MR resource manager
delegate. (Subru Krishnan and Carlo Curino via subru)

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
/**
* The job submitter's view of the Job.
@ -112,6 +113,7 @@ public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private JobStatus status;
private long statustime;
private Cluster cluster;
private ReservationId reservationId;
/**
* @deprecated Use {@link #getInstance()}
@ -1523,5 +1525,24 @@ public boolean isUber() throws IOException, InterruptedException {
updateStatus();
return status.isUber();
}
/**
* Get the reservation to which the job is submitted to, if any
*
* @return the reservationId the identifier of the job's reservation, null if
* the job does not have any reservation associated with it
*/
public ReservationId getReservationId() {
return reservationId;
}
/**
* Set the reservation to which the job is submitted to
*
* @param reservationId the reservationId to set
*/
public void setReservationId(ReservationId reservationId) {
this.reservationId = reservationId;
}
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.QueueACL;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
@ -60,6 +61,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@ -427,6 +429,12 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);

View File

@ -63,6 +63,8 @@ public interface MRJobConfig {
public static final String QUEUE_NAME = "mapreduce.job.queuename";
public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
public static final String JOB_TAGS = "mapreduce.job.tags";
public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";

View File

@ -43,6 +43,12 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -406,4 +412,22 @@ public void moveApplicationAcrossQueues(ApplicationId appId, String queue)
throws YarnException, IOException {
client.moveApplicationAcrossQueues(appId, queue);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
return client.submitReservation(request);
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
return client.updateReservation(request);
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
return client.deleteReservation(request);
}
}

View File

@ -76,6 +76,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -489,6 +490,26 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
appContext.setQueue( // Queue name
jobConf.get(JobContext.QUEUE_NAME,
YarnConfiguration.DEFAULT_QUEUE_NAME));
// add reservationID if present
ReservationId reservationID = null;
try {
reservationID =
ReservationId.parseReservationId(jobConf
.get(JobContext.RESERVATION_ID));
} catch (NumberFormatException e) {
// throw exception as reservationid as is invalid
String errMsg =
"Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
+ " specified for the app: " + applicationId;
LOG.warn(errMsg);
throw new IOException(errMsg);
}
if (reservationID != null) {
appContext.setReservationID(reservationID);
LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
+ " to queue:" + appContext.getQueue() + " with reservationId:"
+ appContext.getReservationID());
}
appContext.setApplicationName( // Job name
jobConf.get(JobContext.JOB_NAME,
YarnConfiguration.DEFAULT_APPLICATION_NAME));
@ -503,6 +524,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
}
return appContext;
}

View File

@ -102,6 +102,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -397,6 +403,24 @@ public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
return null;
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
return null;
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
return null;
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
return null;
}
}
class HistoryService extends AMService implements HSClientProtocol {