YARN-5865. Retrospect updateApplicationPriority api to handle state store exception in align with YARN-5611. Contributed by Sunil G.

This commit is contained in:
Rohith Sharma K S 2016-11-22 14:49:15 +05:30
parent 6f8074298d
commit a926f895c1
22 changed files with 181 additions and 63 deletions

View File

@ -616,7 +616,7 @@ public AllocateResponse allocate(AllocateRequest request)
// Set application priority // Set application priority
allocateResponse.setApplicationPriority(app allocateResponse.setApplicationPriority(app
.getApplicationSubmissionContext().getPriority()); .getApplicationPriority());
// update AMRMToken if the token is rolled-up // update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey = MasterKeyData nextMasterKey =

View File

@ -1602,14 +1602,14 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
.newRecordInstance(UpdateApplicationPriorityResponse.class); .newRecordInstance(UpdateApplicationPriorityResponse.class);
// Update priority only when app is tracked by the scheduler // Update priority only when app is tracked by the scheduler
if (!ACTIVE_APP_STATES.contains(application.getState())) { if (!ACTIVE_APP_STATES.contains(application.getState())) {
if (COMPLETED_APP_STATES.contains(application.getState())) { if (application.isAppInCompletedStates()) {
// If Application is in any of the final states, change priority // If Application is in any of the final states, change priority
// can be skipped rather throwing exception. // can be skipped rather throwing exception.
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService",
applicationId); applicationId);
response.setApplicationPriority(application response.setApplicationPriority(application
.getApplicationSubmissionContext().getPriority()); .getApplicationPriority());
return response; return response;
} }
String msg = "Application in " + application.getState() String msg = "Application in " + application.getState()
@ -1622,8 +1622,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
} }
try { try {
rmContext.getScheduler().updateApplicationPriority(newAppPriority, rmAppManager.updateApplicationPriority(applicationId, newAppPriority);
applicationId);
} catch (YarnException ex) { } catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(), RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
@ -1633,8 +1632,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId); AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
response.setApplicationPriority(application response.setApplicationPriority(application.getApplicationPriority());
.getApplicationSubmissionContext().getPriority());
return response; return response;
} }

View File

@ -357,9 +357,9 @@ private RMAppImpl createAndPopulateNewRMApp(
// Verify and get the update application priority and set back to // Verify and get the update application priority and set back to
// submissionContext // submissionContext
Priority appPriority = rmContext.getScheduler() Priority appPriority = scheduler.checkAndGetApplicationPriority(
.checkAndGetApplicationPriority(submissionContext.getPriority(), user, submissionContext.getPriority(), user, submissionContext.getQueue(),
submissionContext.getQueue(), applicationId); applicationId);
submissionContext.setPriority(appPriority); submissionContext.setPriority(appPriority);
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
@ -521,6 +521,10 @@ public void updateApplicationTimeout(RMApp app,
throws YarnException { throws YarnException {
ApplicationId applicationId = app.getApplicationId(); ApplicationId applicationId = app.getApplicationId();
synchronized (applicationId) { synchronized (applicationId) {
if (app.isAppInCompletedStates()) {
return;
}
Map<ApplicationTimeoutType, Long> newExpireTime = RMServerUtils Map<ApplicationTimeoutType, Long> newExpireTime = RMServerUtils
.validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format); .validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format);
@ -548,4 +552,43 @@ public void updateApplicationTimeout(RMApp app,
((RMAppImpl) app).updateApplicationTimeout(newExpireTime); ((RMAppImpl) app).updateApplicationTimeout(newExpireTime);
} }
} }
/**
* updateApplicationPriority will invoke scheduler api to update the
* new priority to RM and StateStore.
* @param applicationId Application Id
* @param newAppPriority proposed new application priority
* @throws YarnException Handle exceptions
*/
public void updateApplicationPriority(ApplicationId applicationId,
Priority newAppPriority) throws YarnException {
RMApp app = this.rmContext.getRMApps().get(applicationId);
synchronized (applicationId) {
if (app.isAppInCompletedStates()) {
return;
}
// Create a future object to capture exceptions from StateStore.
SettableFuture<Object> future = SettableFuture.create();
// Invoke scheduler api to update priority in scheduler and to
// State Store.
Priority appPriority = rmContext.getScheduler()
.updateApplicationPriority(newAppPriority, applicationId, future);
if (app.getApplicationPriority().equals(appPriority)) {
return;
}
Futures.get(future, YarnException.class);
// update in-memory
((RMAppImpl) app).setApplicationPriority(appPriority);
}
// Update the changed application state to timeline server
rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
}
} }

View File

@ -88,7 +88,7 @@ public void appCreated(RMApp app, long createdTime) {
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
app.getApplicationSubmissionContext().getUnmanagedAM()); app.getApplicationSubmissionContext().getUnmanagedAM());
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
app.getApplicationSubmissionContext().getPriority().getPriority()); app.getApplicationPriority().getPriority());
entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
app.getAmNodeLabelExpression()); app.getAmNodeLabelExpression());
entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION, entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
@ -164,7 +164,7 @@ public void appUpdated(RMApp app, long updatedTime) {
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
app.getQueue()); app.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
app.getApplicationSubmissionContext().getPriority().getPriority()); app.getApplicationPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent(); TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(updatedTime); tEvent.setTimestamp(updatedTime);

View File

@ -117,7 +117,7 @@ public void appCreated(RMApp app, long createdTime) {
ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO, ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
app.getApplicationSubmissionContext().getUnmanagedAM()); app.getApplicationSubmissionContext().getUnmanagedAM());
entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
app.getApplicationSubmissionContext().getPriority().getPriority()); app.getApplicationPriority().getPriority());
entity.getConfigs().put( entity.getConfigs().put(
ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION, ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
app.getAmNodeLabelExpression()); app.getAmNodeLabelExpression());
@ -272,7 +272,7 @@ public void appUpdated(RMApp app, long currentTimeMillis) {
eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
app.getQueue()); app.getQueue());
eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
app.getApplicationSubmissionContext().getPriority().getPriority()); app.getApplicationPriority().getPriority());
TimelineEvent tEvent = new TimelineEvent(); TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE); tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
tEvent.setTimestamp(currentTimeMillis); tEvent.setTimestamp(currentTimeMillis);

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -283,4 +284,17 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
CallerContext getCallerContext(); CallerContext getCallerContext();
Map<ApplicationTimeoutType, Long> getApplicationTimeouts(); Map<ApplicationTimeoutType, Long> getApplicationTimeouts();
/**
* Get priority of the application.
* @return priority
*/
Priority getApplicationPriority();
/**
* To verify whether app has reached in its completing/completed states.
*
* @return True/False to confirm whether app is in final states
*/
boolean isAppInCompletedStates();
} }

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -197,6 +198,8 @@ public class RMAppImpl implements RMApp, Recoverable {
Object transitionTodo; Object transitionTodo;
private Priority applicationPriority;
private static final StateMachineFactory<RMAppImpl, private static final StateMachineFactory<RMAppImpl,
RMAppState, RMAppState,
RMAppEventType, RMAppEventType,
@ -461,6 +464,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.applicationType = applicationType; this.applicationType = applicationType;
this.applicationTags = applicationTags; this.applicationTags = applicationTags;
this.amReq = amReq; this.amReq = amReq;
if (submissionContext.getPriority() != null) {
this.applicationPriority = Priority
.newInstance(submissionContext.getPriority().getPriority());
}
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@ -533,8 +540,6 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
} }
} }
} }
/** /**
@ -777,7 +782,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
createApplicationState(), diags, trackingUrl, this.startTime, createApplicationState(), diags, trackingUrl, this.startTime,
this.finishTime, finishState, appUsageReport, origTrackingUrl, this.finishTime, finishState, appUsageReport, origTrackingUrl,
progress, this.applicationType, amrmToken, applicationTags, progress, this.applicationType, amrmToken, applicationTags,
this.submissionContext.getPriority()); this.getApplicationPriority());
report.setLogAggregationStatus(logAggregationStatus); report.setLogAggregationStatus(logAggregationStatus);
report.setUnmanagedApp(submissionContext.getUnmanagedAM()); report.setUnmanagedApp(submissionContext.getUnmanagedAM());
report.setAppNodeLabelExpression(getAppNodeLabelExpression()); report.setAppNodeLabelExpression(getAppNodeLabelExpression());
@ -1138,14 +1143,14 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// started or started but not yet saved. // started or started but not yet saved.
if (app.attempts.isEmpty()) { if (app.attempts.isEmpty()) {
app.scheduler.handle(new AppAddedSchedulerEvent(app.user, app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
app.submissionContext, false)); app.submissionContext, false, app.applicationPriority));
return RMAppState.SUBMITTED; return RMAppState.SUBMITTED;
} }
// Add application to scheduler synchronously to guarantee scheduler // Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers. // knows applications before AM or NM re-registers.
app.scheduler.handle(new AppAddedSchedulerEvent(app.user, app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
app.submissionContext, true)); app.submissionContext, true, app.applicationPriority));
// recover attempts // recover attempts
app.recoverAppAttempts(); app.recoverAppAttempts();
@ -1162,7 +1167,7 @@ private static final class AddApplicationToSchedulerTransition extends
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new AppAddedSchedulerEvent(app.user, app.handler.handle(new AppAddedSchedulerEvent(app.user,
app.submissionContext, false)); app.submissionContext, false, app.applicationPriority));
// send the ATS create Event // send the ATS create Event
app.sendATSCreateEvent(); app.sendATSCreateEvent();
} }
@ -1619,7 +1624,16 @@ public static boolean isAppInFinalState(RMApp rmApp) {
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED; || appState == RMAppState.KILLED;
} }
@Override
public boolean isAppInCompletedStates() {
RMAppState appState = getState();
return appState == RMAppState.FINISHED || appState == RMAppState.FINISHING
|| appState == RMAppState.FAILED || appState == RMAppState.KILLED
|| appState == RMAppState.FINAL_SAVING
|| appState == RMAppState.KILLING;
}
public RMAppState getRecoveredFinalState() { public RMAppState getRecoveredFinalState() {
return this.recoveredFinalState; return this.recoveredFinalState;
} }
@ -2018,4 +2032,13 @@ public void updateApplicationTimeout(
this.writeLock.unlock(); this.writeLock.unlock();
} }
} }
@Override
public Priority getApplicationPriority() {
return applicationPriority;
}
public void setApplicationPriority(Priority applicationPriority) {
this.applicationPriority = applicationPriority;
}
} }

View File

@ -779,10 +779,12 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
} }
@Override @Override
public void updateApplicationPriority(Priority newPriority, public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId) throws YarnException { ApplicationId applicationId, SettableFuture<Object> future)
throws YarnException {
// Dummy Implementation till Application Priority changes are done in // Dummy Implementation till Application Priority changes are done in
// specific scheduler. // specific scheduler.
return Priority.newInstance(0);
} }
@Override @Override

View File

@ -51,6 +51,8 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import com.google.common.util.concurrent.SettableFuture;
/** /**
* This interface is used by the components to talk to the * This interface is used by the components to talk to the
* scheduler for allocating of resources, cleaning up resources. * scheduler for allocating of resources, cleaning up resources.
@ -318,9 +320,14 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
* @param newPriority Submitted Application priority. * @param newPriority Submitted Application priority.
* *
* @param applicationId Application ID * @param applicationId Application ID
*
* @param future Sets any type of exception happened from StateStore
*
* @return updated priority
*/ */
public void updateApplicationPriority(Priority newPriority, public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId) throws YarnException; ApplicationId applicationId, SettableFuture<Object> future)
throws YarnException;
/** /**
* *

View File

@ -145,6 +145,7 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Evolving @Evolving
@ -2405,8 +2406,9 @@ private Priority getDefaultPriorityForQueue(String queueName) {
} }
@Override @Override
public void updateApplicationPriority(Priority newPriority, public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId) throws YarnException { ApplicationId applicationId, SettableFuture<Object> future)
throws YarnException {
Priority appPriority = null; Priority appPriority = null;
SchedulerApplication<FiCaSchedulerApp> application = applications SchedulerApplication<FiCaSchedulerApp> application = applications
.get(applicationId); .get(applicationId);
@ -2417,38 +2419,36 @@ public void updateApplicationPriority(Priority newPriority,
} }
RMApp rmApp = rmContext.getRMApps().get(applicationId); RMApp rmApp = rmContext.getRMApps().get(applicationId);
appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(), appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(),
rmApp.getQueue(), applicationId); rmApp.getQueue(), applicationId);
if (application.getPriority().equals(appPriority)) { if (application.getPriority().equals(appPriority)) {
return; future.set(null);
return appPriority;
} }
// Update new priority in Submission Context to keep track in HA // Update new priority in Submission Context to update to StateStore.
rmApp.getApplicationSubmissionContext().setPriority(appPriority); rmApp.getApplicationSubmissionContext().setPriority(appPriority);
// Update to state store // Update to state store
ApplicationStateData appState = ApplicationStateData appState = ApplicationStateData.newInstance(
ApplicationStateData.newInstance(rmApp.getSubmitTime(), rmApp.getSubmitTime(), rmApp.getStartTime(),
rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(), rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
rmApp.getUser(), rmApp.getCallerContext()); rmApp.getCallerContext());
appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
rmContext.getStateStore().updateApplicationStateSynchronously(appState, rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false, null); false, future);
// As we use iterator over a TreeSet for OrderingPolicy, once we change // As we use iterator over a TreeSet for OrderingPolicy, once we change
// priority then reinsert back to make order correct. // priority then reinsert back to make order correct.
LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue()); LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
queue.updateApplicationPriority(application, appPriority); queue.updateApplicationPriority(application, appPriority);
// Update the changed application state to timeline server
rmContext.getSystemMetricsPublisher().appUpdated(rmApp,
System.currentTimeMillis());
LOG.info("Priority '" + appPriority + "' is updated in queue :" LOG.info("Priority '" + appPriority + "' is updated in queue :"
+ rmApp.getQueue() + " for application: " + applicationId + rmApp.getQueue() + " for application: " + applicationId
+ " for the user: " + rmApp.getUser()); + " for the user: " + rmApp.getUser());
return appPriority;
} }
@Override @Override

View File

@ -43,10 +43,11 @@ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
} }
public AppAddedSchedulerEvent(String user, public AppAddedSchedulerEvent(String user,
ApplicationSubmissionContext submissionContext, boolean isAppRecovering) { ApplicationSubmissionContext submissionContext, boolean isAppRecovering,
Priority appPriority) {
this(submissionContext.getApplicationId(), submissionContext.getQueue(), this(submissionContext.getApplicationId(), submissionContext.getQueue(),
user, isAppRecovering, submissionContext.getReservationID(), user, isAppRecovering, submissionContext.getReservationID(),
submissionContext.getPriority()); appPriority);
} }
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,

View File

@ -1248,7 +1248,7 @@ public AppPriority getAppPriority(@Context HttpServletRequest hsr,
AppPriority ret = new AppPriority(); AppPriority ret = new AppPriority();
ret.setPriority( ret.setPriority(
app.getApplicationSubmissionContext().getPriority().getPriority()); app.getApplicationPriority().getPriority());
return ret; return ret;
} }
@ -1289,7 +1289,7 @@ public Response updateApplicationPriority(AppPriority targetPriority,
"Trying to update priority an absent application " + appId); "Trying to update priority an absent application " + appId);
throw e; throw e;
} }
Priority priority = app.getApplicationSubmissionContext().getPriority(); Priority priority = app.getApplicationPriority();
if (priority == null if (priority == null
|| priority.getPriority() != targetPriority.getPriority()) { || priority.getPriority() != targetPriority.getPriority()) {
return modifyApplicationPriority(app, callerUGI, return modifyApplicationPriority(app, callerUGI,
@ -1336,7 +1336,7 @@ public Void run() throws IOException, YarnException {
} }
} }
AppPriority ret = new AppPriority( AppPriority ret = new AppPriority(
app.getApplicationSubmissionContext().getPriority().getPriority()); app.getApplicationPriority().getPriority());
return Response.status(Status.OK).entity(ret).build(); return Response.status(Status.OK).entity(ret).build();
} }

View File

@ -148,10 +148,9 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
this.name = app.getName().toString(); this.name = app.getName().toString();
this.queue = app.getQueue().toString(); this.queue = app.getQueue().toString();
this.priority = 0; this.priority = 0;
ApplicationSubmissionContext appSubmissionContext =
app.getApplicationSubmissionContext(); if (app.getApplicationPriority() != null) {
if (appSubmissionContext.getPriority() != null) { this.priority = app.getApplicationPriority()
this.priority = appSubmissionContext.getPriority()
.getPriority(); .getPriority();
} }
this.progress = app.getProgress() * 100; this.progress = app.getProgress() * 100;
@ -220,6 +219,8 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
vcoreSeconds = appMetrics.getVcoreSeconds(); vcoreSeconds = appMetrics.getVcoreSeconds();
preemptedMemorySeconds = appMetrics.getPreemptedMemorySeconds(); preemptedMemorySeconds = appMetrics.getPreemptedMemorySeconds();
preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds(); preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
ApplicationSubmissionContext appSubmissionContext =
app.getApplicationSubmissionContext();
unmanagedApplication = unmanagedApplication =
appSubmissionContext.getUnmanagedAM(); appSubmissionContext.getUnmanagedAM();
appNodeLabelExpression = appNodeLabelExpression =

View File

@ -251,6 +251,7 @@ public void setUp() {
asContext.setApplicationId(appId); asContext.setApplicationId(appId);
asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
asContext.setResource(mockResource()); asContext.setResource(mockResource());
asContext.setPriority(Priority.newInstance(0));
setupDispatcher(rmContext, conf); setupDispatcher(rmContext, conf);
} }

View File

@ -461,12 +461,10 @@ public void testPriorityInAllocatedResponse() throws Exception {
AllocateResponse response1 = am1.allocate(allocateRequest); AllocateResponse response1 = am1.allocate(allocateRequest);
Assert.assertEquals(appPriority1, response1.getApplicationPriority()); Assert.assertEquals(appPriority1, response1.getApplicationPriority());
// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Change the priority of App1 to 8 // Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8); Priority appPriority2 = Priority.newInstance(8);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); rm.getRMAppManager().updateApplicationPriority(app1.getApplicationId(),
appPriority2);
AllocateResponse response2 = am1.allocate(allocateRequest); AllocateResponse response2 = am1.allocate(allocateRequest);
Assert.assertEquals(appPriority2, response2.getApplicationPriority()); Assert.assertEquals(appPriority2, response2.getApplicationPriority());

View File

@ -456,7 +456,7 @@ public void testGetContainers() throws YarnException, IOException {
} }
} }
public ClientRMService createRMService() throws IOException { public ClientRMService createRMService() throws IOException, YarnException {
YarnScheduler yarnScheduler = mockYarnScheduler(); YarnScheduler yarnScheduler = mockYarnScheduler();
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext); mockRMContext(yarnScheduler, rmContext);
@ -968,6 +968,7 @@ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
submissionContext.setApplicationType(appType); submissionContext.setApplicationType(appType);
submissionContext.setApplicationTags(tags); submissionContext.setApplicationTags(tags);
submissionContext.setUnmanagedAM(unmanaged); submissionContext.setUnmanagedAM(unmanaged);
submissionContext.setPriority(Priority.newInstance(0));
SubmitApplicationRequest submitRequest = SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class); recordFactory.newRecordInstance(SubmitApplicationRequest.class);
@ -1042,6 +1043,7 @@ private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class); ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class);
when(asContext.getMaxAppAttempts()).thenReturn(1); when(asContext.getMaxAppAttempts()).thenReturn(1);
when(asContext.getNodeLabelExpression()).thenReturn(appNodeLabelExpression); when(asContext.getNodeLabelExpression()).thenReturn(appNodeLabelExpression);
when(asContext.getPriority()).thenReturn(Priority.newInstance(0));
RMAppImpl app = RMAppImpl app =
spy(new RMAppImpl(applicationId3, rmContext, config, null, null, spy(new RMAppImpl(applicationId3, rmContext, config, null, null,
queueName, asContext, yarnScheduler, null, queueName, asContext, yarnScheduler, null,
@ -1076,6 +1078,7 @@ public ApplicationReport createAndGetApplicationReport(
attempts.put(attemptId, rmAppAttemptImpl); attempts.put(attemptId, rmAppAttemptImpl);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl); when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
when(app.getAppAttempts()).thenReturn(attempts); when(app.getAppAttempts()).thenReturn(attempts);
when(app.getApplicationPriority()).thenReturn(Priority.newInstance(0));
when(rmAppAttemptImpl.getMasterContainer()).thenReturn(container); when(rmAppAttemptImpl.getMasterContainer()).thenReturn(container);
ResourceScheduler rs = mock(ResourceScheduler.class); ResourceScheduler rs = mock(ResourceScheduler.class);
when(rmContext.getScheduler()).thenReturn(rs); when(rmContext.getScheduler()).thenReturn(rs);
@ -1098,7 +1101,7 @@ public ApplicationReport createAndGetApplicationReport(
return app; return app;
} }
private static YarnScheduler mockYarnScheduler() { private static YarnScheduler mockYarnScheduler() throws YarnException {
YarnScheduler yarnScheduler = mock(YarnScheduler.class); YarnScheduler yarnScheduler = mock(YarnScheduler.class);
when(yarnScheduler.getMinimumResourceCapability()).thenReturn( when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource( Resources.createResource(
@ -1116,6 +1119,9 @@ private static YarnScheduler mockYarnScheduler() {
ResourceCalculator rs = mock(ResourceCalculator.class); ResourceCalculator rs = mock(ResourceCalculator.class);
when(yarnScheduler.getResourceCalculator()).thenReturn(rs); when(yarnScheduler.getResourceCalculator()).thenReturn(rs);
when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class),
anyString(), anyString(), any(ApplicationId.class)))
.thenReturn(Priority.newInstance(0));
return yarnScheduler; return yarnScheduler;
} }
@ -1675,8 +1681,7 @@ public void testUpdateApplicationPriorityRequest() throws Exception {
RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority)); RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority));
Assert.assertEquals("Incorrect priority has been set to application", Assert.assertEquals("Incorrect priority has been set to application",
appPriority, app1.getApplicationSubmissionContext().getPriority() appPriority, app1.getApplicationPriority().getPriority());
.getPriority());
appPriority = 11; appPriority = 11;
ClientRMService rmService = rm.getClientRMService(); ClientRMService rmService = rm.getClientRMService();

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -235,6 +236,16 @@ public CallerContext getCallerContext() {
public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() { public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public Priority getApplicationPriority() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public boolean isAppInCompletedStates() {
throw new UnsupportedOperationException("Not supported yet.");
}
} }
public static RMApp newApplication(int i) { public static RMApp newApplication(int i) {

View File

@ -126,6 +126,7 @@ public void testPublishApplicationMetrics() throws Exception {
.thenReturn(Collections.singletonList("java -Xmx1024m")); .thenReturn(Collections.singletonList("java -Xmx1024m"));
when(asc.getAMContainerSpec()).thenReturn(containerLaunchContext); when(asc.getAMContainerSpec()).thenReturn(containerLaunchContext);
when(app.getApplicationSubmissionContext()).thenReturn(asc); when(app.getApplicationSubmissionContext()).thenReturn(asc);
when(app.getApplicationPriority()).thenReturn(Priority.newInstance(1));
metricsPublisher.appUpdated(app, 4L); metricsPublisher.appUpdated(app, 4L);
} else { } else {
metricsPublisher.appUpdated(app, 4L); metricsPublisher.appUpdated(app, 4L);
@ -527,6 +528,7 @@ private static RMApp createRMApp(ApplicationId appId) {
when(amReq.getNodeLabelExpression()).thenReturn("high-mem"); when(amReq.getNodeLabelExpression()).thenReturn("high-mem");
when(app.getAMResourceRequest()).thenReturn(amReq); when(app.getAMResourceRequest()).thenReturn(amReq);
when(app.getAmNodeLabelExpression()).thenCallRealMethod(); when(app.getAmNodeLabelExpression()).thenCallRealMethod();
when(app.getApplicationPriority()).thenReturn(Priority.newInstance(10));
when(app.getCallerContext()) when(app.getCallerContext())
.thenReturn(new CallerContext.Builder("context").build()); .thenReturn(new CallerContext.Builder("context").build());
return app; return app;

View File

@ -362,6 +362,7 @@ private static RMApp createRMApp(ApplicationId appId) {
when(appSubmissionContext.getPriority()) when(appSubmissionContext.getPriority())
.thenReturn(Priority.newInstance(0)); .thenReturn(Priority.newInstance(0));
when(app.getApplicationPriority()).thenReturn(Priority.newInstance(10));
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
mock(ContainerLaunchContext.class); mock(ContainerLaunchContext.class);
when(containerLaunchContext.getCommands()) when(containerLaunchContext.getCommands())

View File

@ -321,4 +321,14 @@ public void setCollectorAddr(String collectorAddr) {
public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() { public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public Priority getApplicationPriority() {
return null;
}
@Override
public boolean isAppInCompletedStates() {
return false;
}
} }

View File

@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -266,6 +267,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
// applicationId will not be used because RMStateStore is mocked, // applicationId will not be used because RMStateStore is mocked,
// but applicationId is still set for safety // but applicationId is still set for safety
submissionContext.setApplicationId(applicationId); submissionContext.setApplicationId(applicationId);
submissionContext.setPriority(Priority.newInstance(0));
RMApp application = new RMAppImpl(applicationId, rmContext, conf, name, RMApp application = new RMAppImpl(applicationId, rmContext, conf, name,
user, queue, submissionContext, scheduler, masterService, user, queue, submissionContext, scheduler, masterService,

View File

@ -344,7 +344,7 @@ public void testUpdatePriorityAtRuntime() throws Exception {
// Change the priority of App1 to 8 // Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8); Priority appPriority2 = Priority.newInstance(8);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
// get scheduler app // get scheduler app
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
@ -378,7 +378,7 @@ public void testUpdateInvalidPriorityAtRuntime() throws Exception {
// Change the priority of App1 to 15 // Change the priority of App1 to 15
Priority appPriority2 = Priority.newInstance(15); Priority appPriority2 = Priority.newInstance(15);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
// get scheduler app // get scheduler app
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
@ -428,7 +428,7 @@ public void testRMRestartWithChangeInPriority() throws Exception {
// Change the priority of App1 to 8 // Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8); Priority appPriority2 = Priority.newInstance(8);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); cs.updateApplicationPriority(appPriority2, app1.getApplicationId(), null);
// let things settle down // let things settle down
Thread.sleep(1000); Thread.sleep(1000);
@ -449,8 +449,7 @@ public void testRMRestartWithChangeInPriority() throws Exception {
.get(app1.getApplicationId()); .get(app1.getApplicationId());
// Verify whether priority 15 is reset to 10 // Verify whether priority 15 is reset to 10
Assert.assertEquals(appPriority2, loadedApp.getCurrentAppAttempt() Assert.assertEquals(appPriority2, loadedApp.getApplicationPriority());
.getSubmissionContext().getPriority());
rm2.stop(); rm2.stop();
rm1.stop(); rm1.stop();
@ -558,7 +557,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
// Change the priority of App1 to 3 (lowest) // Change the priority of App1 to 3 (lowest)
Priority appPriority3 = Priority.newInstance(3); Priority appPriority3 = Priority.newInstance(3);
cs.updateApplicationPriority(appPriority3, app2.getApplicationId()); cs.updateApplicationPriority(appPriority3, app2.getApplicationId(), null);
// add request for containers App2 // add request for containers App2
am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>()); am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>());
@ -790,7 +789,7 @@ private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue,
throws YarnException { throws YarnException {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
cs.updateApplicationPriority(Priority.newInstance(2), cs.updateApplicationPriority(Priority.newInstance(2),
app.getApplicationId()); app.getApplicationId(), null);
SchedulerEvent removeAttempt; SchedulerEvent removeAttempt;
removeAttempt = new AppAttemptRemovedSchedulerEvent( removeAttempt = new AppAttemptRemovedSchedulerEvent(
app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED,