YARN-4164. Changed updateApplicationPriority API to return the updated application priority. Contributed by Rohith Sharma K S
This commit is contained in:
parent
31d2fb91fc
commit
1147b086d9
|
@ -476,9 +476,9 @@ public class ResourceMgrDelegate extends YarnClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateApplicationPriority(ApplicationId applicationId,
|
public Priority updateApplicationPriority(ApplicationId applicationId,
|
||||||
Priority priority) throws YarnException, IOException {
|
Priority priority) throws YarnException, IOException {
|
||||||
client.updateApplicationPriority(applicationId, priority);
|
return client.updateApplicationPriority(applicationId, priority);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -577,6 +577,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-4293. ResourceUtilization should be a part of yarn node CLI. (Sunil G via wangda)
|
YARN-4293. ResourceUtilization should be a part of yarn node CLI. (Sunil G via wangda)
|
||||||
|
|
||||||
|
YARN-4164. Changed updateApplicationPriority API to return the updated
|
||||||
|
application priority. (Rohith Sharma K S via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.api.protocolrecords;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -39,9 +40,24 @@ import org.apache.hadoop.yarn.util.Records;
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract class UpdateApplicationPriorityResponse {
|
public abstract class UpdateApplicationPriorityResponse {
|
||||||
|
|
||||||
public static UpdateApplicationPriorityResponse newInstance() {
|
public static UpdateApplicationPriorityResponse newInstance(
|
||||||
|
Priority priority) {
|
||||||
UpdateApplicationPriorityResponse response =
|
UpdateApplicationPriorityResponse response =
|
||||||
Records.newRecord(UpdateApplicationPriorityResponse.class);
|
Records.newRecord(UpdateApplicationPriorityResponse.class);
|
||||||
|
response.setApplicationPriority(priority);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the <code>Priority</code> of the application to be set.
|
||||||
|
* @return Updated <code>Priority</code> of the application.
|
||||||
|
*/
|
||||||
|
public abstract Priority getApplicationPriority();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the <code>Priority</code> of the application.
|
||||||
|
*
|
||||||
|
* @param priority <code>Priority</code> of the application
|
||||||
|
*/
|
||||||
|
public abstract void setApplicationPriority(Priority priority);
|
||||||
}
|
}
|
||||||
|
|
|
@ -231,6 +231,7 @@ message UpdateApplicationPriorityRequestProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
message UpdateApplicationPriorityResponseProto {
|
message UpdateApplicationPriorityResponseProto {
|
||||||
|
optional PriorityProto applicationPriority = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SignalContainerRequestProto {
|
message SignalContainerRequestProto {
|
||||||
|
|
|
@ -692,12 +692,14 @@ public abstract class YarnClient extends AbstractService {
|
||||||
* </p>
|
* </p>
|
||||||
* @param applicationId
|
* @param applicationId
|
||||||
* @param priority
|
* @param priority
|
||||||
|
* @return updated priority of an application.
|
||||||
* @throws YarnException
|
* @throws YarnException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void updateApplicationPriority(ApplicationId applicationId,
|
public abstract Priority updateApplicationPriority(
|
||||||
|
ApplicationId applicationId,
|
||||||
Priority priority) throws YarnException, IOException;
|
Priority priority) throws YarnException, IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -832,11 +832,11 @@ public class YarnClientImpl extends YarnClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateApplicationPriority(ApplicationId applicationId,
|
public Priority updateApplicationPriority(ApplicationId applicationId,
|
||||||
Priority priority) throws YarnException, IOException {
|
Priority priority) throws YarnException, IOException {
|
||||||
UpdateApplicationPriorityRequest request =
|
UpdateApplicationPriorityRequest request =
|
||||||
UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
|
UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
|
||||||
rmClient.updateApplicationPriority(request);
|
return rmClient.updateApplicationPriority(request).getApplicationPriority();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -713,8 +713,17 @@ public class ApplicationCLI extends YarnCLI {
|
||||||
ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
|
ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
|
||||||
Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority));
|
Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority));
|
||||||
sysout.println("Updating priority of an aplication " + applicationId);
|
sysout.println("Updating priority of an aplication " + applicationId);
|
||||||
client.updateApplicationPriority(appId, newAppPriority);
|
Priority updateApplicationPriority =
|
||||||
sysout.println("Successfully updated the application with id "
|
client.updateApplicationPriority(appId, newAppPriority);
|
||||||
+ applicationId + " with priority '" + priority + "'");
|
if (newAppPriority.equals(updateApplicationPriority)) {
|
||||||
|
sysout.println("Successfully updated the application "
|
||||||
|
+ applicationId + " with priority '" + priority + "'");
|
||||||
|
} else {
|
||||||
|
sysout
|
||||||
|
.println("Updated priority of an application "
|
||||||
|
+ applicationId
|
||||||
|
+ " to cluster max priority OR keeping old priority"
|
||||||
|
+ " as application is in final states");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,11 @@
|
||||||
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProtoOrBuilder;
|
||||||
|
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
|
@ -31,6 +35,8 @@ public class UpdateApplicationPriorityResponsePBImpl extends
|
||||||
UpdateApplicationPriorityResponseProto.Builder builder = null;
|
UpdateApplicationPriorityResponseProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
|
||||||
|
private Priority updatedAppPriority = null;
|
||||||
|
|
||||||
public UpdateApplicationPriorityResponsePBImpl() {
|
public UpdateApplicationPriorityResponsePBImpl() {
|
||||||
builder = UpdateApplicationPriorityResponseProto.newBuilder();
|
builder = UpdateApplicationPriorityResponseProto.newBuilder();
|
||||||
}
|
}
|
||||||
|
@ -42,11 +48,68 @@ public class UpdateApplicationPriorityResponsePBImpl extends
|
||||||
}
|
}
|
||||||
|
|
||||||
public UpdateApplicationPriorityResponseProto getProto() {
|
public UpdateApplicationPriorityResponseProto getProto() {
|
||||||
|
mergeLocalToProto();
|
||||||
proto = viaProto ? proto : builder.build();
|
proto = viaProto ? proto : builder.build();
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
return proto;
|
return proto;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToProto() {
|
||||||
|
if (viaProto) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
}
|
||||||
|
mergeLocalToBuilder();
|
||||||
|
proto = builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = UpdateApplicationPriorityResponseProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToBuilder() {
|
||||||
|
if (this.updatedAppPriority != null) {
|
||||||
|
builder
|
||||||
|
.setApplicationPriority(
|
||||||
|
convertToProtoFormat(this.updatedAppPriority));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Priority getApplicationPriority() {
|
||||||
|
UpdateApplicationPriorityResponseProtoOrBuilder p =
|
||||||
|
viaProto ? proto : builder;
|
||||||
|
if (this.updatedAppPriority != null) {
|
||||||
|
return this.updatedAppPriority;
|
||||||
|
}
|
||||||
|
if (!p.hasApplicationPriority()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.updatedAppPriority =
|
||||||
|
convertFromProtoFormat(p.getApplicationPriority());
|
||||||
|
return this.updatedAppPriority;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setApplicationPriority(Priority priority) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (priority == null) {
|
||||||
|
builder.clearApplicationPriority();
|
||||||
|
}
|
||||||
|
this.updatedAppPriority = priority;
|
||||||
|
}
|
||||||
|
|
||||||
|
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
|
||||||
|
return new PriorityPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private PriorityProto convertToProtoFormat(Priority t) {
|
||||||
|
return ((PriorityPBImpl) t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return getProto().hashCode();
|
return getProto().hashCode();
|
||||||
|
|
|
@ -1448,6 +1448,8 @@ public class ClientRMService extends AbstractService implements
|
||||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService",
|
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService",
|
||||||
applicationId);
|
applicationId);
|
||||||
|
response.setApplicationPriority(application
|
||||||
|
.getApplicationSubmissionContext().getPriority());
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
String msg = "Application in " + application.getState()
|
String msg = "Application in " + application.getState()
|
||||||
|
@ -1471,6 +1473,8 @@ public class ClientRMService extends AbstractService implements
|
||||||
|
|
||||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
|
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
|
||||||
|
response.setApplicationPriority(application
|
||||||
|
.getApplicationSubmissionContext().getPriority());
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1479,6 +1483,7 @@ public class ClientRMService extends AbstractService implements
|
||||||
* After the request passes some sanity check, it will be delivered
|
* After the request passes some sanity check, it will be delivered
|
||||||
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
|
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public SignalContainerResponse signalContainer(
|
public SignalContainerResponse signalContainer(
|
||||||
SignalContainerRequest request) throws YarnException, IOException {
|
SignalContainerRequest request) throws YarnException, IOException {
|
||||||
|
|
|
@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -1330,49 +1331,54 @@ public class TestClientRMService {
|
||||||
appPriority, app1.getApplicationSubmissionContext().getPriority()
|
appPriority, app1.getApplicationSubmissionContext().getPriority()
|
||||||
.getPriority());
|
.getPriority());
|
||||||
|
|
||||||
appPriority = 9;
|
appPriority = 11;
|
||||||
ClientRMService rmService = rm.getClientRMService();
|
ClientRMService rmService = rm.getClientRMService();
|
||||||
UpdateApplicationPriorityRequest updateRequest =
|
testAplicationPriorityUpdation(rmService, app1, appPriority, maxPriority);
|
||||||
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
|
|
||||||
Priority.newInstance(appPriority));
|
|
||||||
|
|
||||||
rmService.updateApplicationPriority(updateRequest);
|
appPriority = 9;
|
||||||
|
testAplicationPriorityUpdation(rmService, app1, appPriority, appPriority);
|
||||||
Assert.assertEquals("Incorrect priority has been set to application",
|
|
||||||
appPriority, app1.getApplicationSubmissionContext().getPriority()
|
|
||||||
.getPriority());
|
|
||||||
|
|
||||||
rm.killApp(app1.getApplicationId());
|
rm.killApp(app1.getApplicationId());
|
||||||
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||||
|
|
||||||
appPriority = 8;
|
|
||||||
UpdateApplicationPriorityRequest updateRequestNew =
|
|
||||||
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
|
|
||||||
Priority.newInstance(appPriority));
|
|
||||||
// Update priority request for application in KILLED state
|
|
||||||
rmService.updateApplicationPriority(updateRequestNew);
|
|
||||||
|
|
||||||
// Hence new priority should not be updated
|
|
||||||
Assert.assertNotEquals("Priority should not be updated as app is in KILLED state",
|
|
||||||
appPriority, app1.getApplicationSubmissionContext().getPriority()
|
|
||||||
.getPriority());
|
|
||||||
Assert.assertEquals("Priority should be same as old one before update",
|
|
||||||
9, app1.getApplicationSubmissionContext().getPriority()
|
|
||||||
.getPriority());
|
|
||||||
|
|
||||||
// Update priority request for invalid application id.
|
// Update priority request for invalid application id.
|
||||||
ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
|
ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
|
||||||
updateRequest =
|
UpdateApplicationPriorityRequest updateRequest =
|
||||||
UpdateApplicationPriorityRequest.newInstance(invalidAppId,
|
UpdateApplicationPriorityRequest.newInstance(invalidAppId,
|
||||||
Priority.newInstance(appPriority));
|
Priority.newInstance(appPriority));
|
||||||
try {
|
try {
|
||||||
rmService.updateApplicationPriority(updateRequest);
|
rmService.updateApplicationPriority(updateRequest);
|
||||||
Assert
|
Assert.fail("ApplicationNotFoundException should be thrown "
|
||||||
.fail("ApplicationNotFoundException should be thrown for invalid application id");
|
+ "for invalid application id");
|
||||||
} catch (ApplicationNotFoundException e) {
|
} catch (ApplicationNotFoundException e) {
|
||||||
// Expected
|
// Expected
|
||||||
}
|
}
|
||||||
|
|
||||||
|
updateRequest =
|
||||||
|
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
|
||||||
|
Priority.newInstance(11));
|
||||||
|
Assert.assertEquals("Incorrect priority has been set to application",
|
||||||
|
appPriority, rmService.updateApplicationPriority(updateRequest)
|
||||||
|
.getApplicationPriority().getPriority());
|
||||||
|
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testAplicationPriorityUpdation(ClientRMService rmService,
|
||||||
|
RMApp app1, int tobeUpdatedPriority, int expected) throws YarnException,
|
||||||
|
IOException {
|
||||||
|
UpdateApplicationPriorityRequest updateRequest =
|
||||||
|
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
|
||||||
|
Priority.newInstance(tobeUpdatedPriority));
|
||||||
|
|
||||||
|
UpdateApplicationPriorityResponse updateApplicationPriority =
|
||||||
|
rmService.updateApplicationPriority(updateRequest);
|
||||||
|
|
||||||
|
Assert.assertEquals("Incorrect priority has been set to application",
|
||||||
|
expected, app1.getApplicationSubmissionContext().getPriority()
|
||||||
|
.getPriority());
|
||||||
|
Assert.assertEquals("Incorrect priority has been returned", expected,
|
||||||
|
updateApplicationPriority.getApplicationPriority().getPriority());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue