YARN-4164. Changed updateApplicationPriority API to return the updated application priority. Contributed by Rohith Sharma K S

This commit is contained in:
Jian He 2015-12-18 14:13:48 -08:00
parent e63388fdf2
commit 85c2466048
10 changed files with 140 additions and 35 deletions

View File

@ -476,9 +476,9 @@ public class ResourceMgrDelegate extends YarnClient {
} }
@Override @Override
public void updateApplicationPriority(ApplicationId applicationId, public Priority updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException { Priority priority) throws YarnException, IOException {
client.updateApplicationPriority(applicationId, priority); return client.updateApplicationPriority(applicationId, priority);
} }
@Override @Override

View File

@ -629,6 +629,9 @@ Release 2.8.0 - UNRELEASED
YARN-4293. ResourceUtilization should be a part of yarn node CLI. (Sunil G via wangda) YARN-4293. ResourceUtilization should be a part of yarn node CLI. (Sunil G via wangda)
YARN-4164. Changed updateApplicationPriority API to return the updated
application priority. (Rohith Sharma K S via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -39,9 +40,24 @@ import org.apache.hadoop.yarn.util.Records;
@Unstable @Unstable
public abstract class UpdateApplicationPriorityResponse { public abstract class UpdateApplicationPriorityResponse {
public static UpdateApplicationPriorityResponse newInstance() { public static UpdateApplicationPriorityResponse newInstance(
Priority priority) {
UpdateApplicationPriorityResponse response = UpdateApplicationPriorityResponse response =
Records.newRecord(UpdateApplicationPriorityResponse.class); Records.newRecord(UpdateApplicationPriorityResponse.class);
response.setApplicationPriority(priority);
return response; return response;
} }
/**
* Get the <code>Priority</code> of the application to be set.
* @return Updated <code>Priority</code> of the application.
*/
public abstract Priority getApplicationPriority();
/**
* Set the <code>Priority</code> of the application.
*
* @param priority <code>Priority</code> of the application
*/
public abstract void setApplicationPriority(Priority priority);
} }

View File

@ -231,6 +231,7 @@ message UpdateApplicationPriorityRequestProto {
} }
message UpdateApplicationPriorityResponseProto { message UpdateApplicationPriorityResponseProto {
optional PriorityProto applicationPriority = 1;
} }
message SignalContainerRequestProto { message SignalContainerRequestProto {

View File

@ -692,12 +692,14 @@ public abstract class YarnClient extends AbstractService {
* </p> * </p>
* @param applicationId * @param applicationId
* @param priority * @param priority
* @return updated priority of an application.
* @throws YarnException * @throws YarnException
* @throws IOException * @throws IOException
*/ */
@Public @Public
@Unstable @Unstable
public abstract void updateApplicationPriority(ApplicationId applicationId, public abstract Priority updateApplicationPriority(
ApplicationId applicationId,
Priority priority) throws YarnException, IOException; Priority priority) throws YarnException, IOException;
/** /**

View File

@ -832,11 +832,11 @@ public class YarnClientImpl extends YarnClient {
} }
@Override @Override
public void updateApplicationPriority(ApplicationId applicationId, public Priority updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException { Priority priority) throws YarnException, IOException {
UpdateApplicationPriorityRequest request = UpdateApplicationPriorityRequest request =
UpdateApplicationPriorityRequest.newInstance(applicationId, priority); UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
rmClient.updateApplicationPriority(request); return rmClient.updateApplicationPriority(request).getApplicationPriority();
} }
@Override @Override

View File

@ -713,8 +713,17 @@ public class ApplicationCLI extends YarnCLI {
ApplicationId appId = ConverterUtils.toApplicationId(applicationId); ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority)); Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority));
sysout.println("Updating priority of an aplication " + applicationId); sysout.println("Updating priority of an aplication " + applicationId);
client.updateApplicationPriority(appId, newAppPriority); Priority updateApplicationPriority =
sysout.println("Successfully updated the application with id " client.updateApplicationPriority(appId, newAppPriority);
+ applicationId + " with priority '" + priority + "'"); if (newAppPriority.equals(updateApplicationPriority)) {
sysout.println("Successfully updated the application "
+ applicationId + " with priority '" + priority + "'");
} else {
sysout
.println("Updated priority of an application "
+ applicationId
+ " to cluster max priority OR keeping old priority"
+ " as application is in final states");
}
} }
} }

View File

@ -19,7 +19,11 @@
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProtoOrBuilder;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
@ -31,6 +35,8 @@ public class UpdateApplicationPriorityResponsePBImpl extends
UpdateApplicationPriorityResponseProto.Builder builder = null; UpdateApplicationPriorityResponseProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private Priority updatedAppPriority = null;
public UpdateApplicationPriorityResponsePBImpl() { public UpdateApplicationPriorityResponsePBImpl() {
builder = UpdateApplicationPriorityResponseProto.newBuilder(); builder = UpdateApplicationPriorityResponseProto.newBuilder();
} }
@ -42,11 +48,68 @@ public class UpdateApplicationPriorityResponsePBImpl extends
} }
public UpdateApplicationPriorityResponseProto getProto() { public UpdateApplicationPriorityResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build(); proto = viaProto ? proto : builder.build();
viaProto = true; viaProto = true;
return proto; return proto;
} }
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = UpdateApplicationPriorityResponseProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.updatedAppPriority != null) {
builder
.setApplicationPriority(
convertToProtoFormat(this.updatedAppPriority));
}
}
@Override
public Priority getApplicationPriority() {
UpdateApplicationPriorityResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (this.updatedAppPriority != null) {
return this.updatedAppPriority;
}
if (!p.hasApplicationPriority()) {
return null;
}
this.updatedAppPriority =
convertFromProtoFormat(p.getApplicationPriority());
return this.updatedAppPriority;
}
@Override
public void setApplicationPriority(Priority priority) {
maybeInitBuilder();
if (priority == null) {
builder.clearApplicationPriority();
}
this.updatedAppPriority = priority;
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl) t).getProto();
}
@Override @Override
public int hashCode() { public int hashCode() {
return getProto().hashCode(); return getProto().hashCode();

View File

@ -1449,6 +1449,8 @@ public class ClientRMService extends AbstractService implements
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService",
applicationId); applicationId);
response.setApplicationPriority(application
.getApplicationSubmissionContext().getPriority());
return response; return response;
} }
String msg = "Application in " + application.getState() String msg = "Application in " + application.getState()
@ -1472,6 +1474,8 @@ public class ClientRMService extends AbstractService implements
RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId); AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
response.setApplicationPriority(application
.getApplicationSubmissionContext().getPriority());
return response; return response;
} }
@ -1480,6 +1484,7 @@ public class ClientRMService extends AbstractService implements
* After the request passes some sanity check, it will be delivered * After the request passes some sanity check, it will be delivered
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request * to RMNodeImpl so that the next NM heartbeat will pick up the signal request
*/ */
@SuppressWarnings("unchecked")
@Override @Override
public SignalContainerResponse signalContainer( public SignalContainerResponse signalContainer(
SignalContainerRequest request) throws YarnException, IOException { SignalContainerRequest request) throws YarnException, IOException {

View File

@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -1330,49 +1331,54 @@ public class TestClientRMService {
appPriority, app1.getApplicationSubmissionContext().getPriority() appPriority, app1.getApplicationSubmissionContext().getPriority()
.getPriority()); .getPriority());
appPriority = 9; appPriority = 11;
ClientRMService rmService = rm.getClientRMService(); ClientRMService rmService = rm.getClientRMService();
UpdateApplicationPriorityRequest updateRequest = testAplicationPriorityUpdation(rmService, app1, appPriority, maxPriority);
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
Priority.newInstance(appPriority));
rmService.updateApplicationPriority(updateRequest); appPriority = 9;
testAplicationPriorityUpdation(rmService, app1, appPriority, appPriority);
Assert.assertEquals("Incorrect priority has been set to application",
appPriority, app1.getApplicationSubmissionContext().getPriority()
.getPriority());
rm.killApp(app1.getApplicationId()); rm.killApp(app1.getApplicationId());
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
appPriority = 8;
UpdateApplicationPriorityRequest updateRequestNew =
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
Priority.newInstance(appPriority));
// Update priority request for application in KILLED state
rmService.updateApplicationPriority(updateRequestNew);
// Hence new priority should not be updated
Assert.assertNotEquals("Priority should not be updated as app is in KILLED state",
appPriority, app1.getApplicationSubmissionContext().getPriority()
.getPriority());
Assert.assertEquals("Priority should be same as old one before update",
9, app1.getApplicationSubmissionContext().getPriority()
.getPriority());
// Update priority request for invalid application id. // Update priority request for invalid application id.
ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3); ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
updateRequest = UpdateApplicationPriorityRequest updateRequest =
UpdateApplicationPriorityRequest.newInstance(invalidAppId, UpdateApplicationPriorityRequest.newInstance(invalidAppId,
Priority.newInstance(appPriority)); Priority.newInstance(appPriority));
try { try {
rmService.updateApplicationPriority(updateRequest); rmService.updateApplicationPriority(updateRequest);
Assert Assert.fail("ApplicationNotFoundException should be thrown "
.fail("ApplicationNotFoundException should be thrown for invalid application id"); + "for invalid application id");
} catch (ApplicationNotFoundException e) { } catch (ApplicationNotFoundException e) {
// Expected // Expected
} }
updateRequest =
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
Priority.newInstance(11));
Assert.assertEquals("Incorrect priority has been set to application",
appPriority, rmService.updateApplicationPriority(updateRequest)
.getApplicationPriority().getPriority());
rm.stop(); rm.stop();
} }
private void testAplicationPriorityUpdation(ClientRMService rmService,
RMApp app1, int tobeUpdatedPriority, int expected) throws YarnException,
IOException {
UpdateApplicationPriorityRequest updateRequest =
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
Priority.newInstance(tobeUpdatedPriority));
UpdateApplicationPriorityResponse updateApplicationPriority =
rmService.updateApplicationPriority(updateRequest);
Assert.assertEquals("Incorrect priority has been set to application",
expected, app1.getApplicationSubmissionContext().getPriority()
.getPriority());
Assert.assertEquals("Incorrect priority has been returned", expected,
updateApplicationPriority.getApplicationPriority().getPriority());
}
} }