YARN-4014. Support user cli interface in for Application Priority. Contributed by Rohith Sharma K S

This commit is contained in:
Jian He 2015-08-24 20:36:08 -07:00
parent 3b00eaea25
commit 57c7ae1aff
22 changed files with 713 additions and 7 deletions

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -466,4 +467,10 @@ public class ResourceMgrDelegate extends YarnClient {
throws YarnException, IOException {
return client.getClusterNodeLabels();
}
@Override
public void updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException {
client.updateApplicationPriority(applicationId, priority);
}
}

View File

@ -114,6 +114,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -444,6 +446,13 @@ public class TestClientRedirect {
GetLabelsToNodesRequest request) throws YarnException, IOException {
return null;
}
@Override
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request) throws YarnException,
IOException {
return null;
}
}
class HistoryService extends AMService implements HSClientProtocol {

View File

@ -181,6 +181,9 @@ Release 2.8.0 - UNRELEASED
YARN-221. NM should provide a way for AM to tell it not to aggregate logs.
(Ming Ma via xgong)
YARN-4014. Support user cli interface in for Application Priority.
(Rohith Sharma K S via jianhe)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -419,4 +421,20 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
@Unstable
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException;
/**
* <p>
* The interface used by client to set priority of an application.
* </p>
* @param request to set priority of an application
* @return an empty response
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
@Idempotent
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request) throws YarnException,
IOException;
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>
* The request sent by the client to the <code>ResourceManager</code> to set or
* update the application priority.
* </p>
* <p>
* The request includes the {@link ApplicationId} of the application and
* {@link Priority} to be set for an application
* </p>
*
* @see ApplicationClientProtocol#updateApplicationPriority(UpdateApplicationPriorityRequest)
*/
@Public
@Unstable
public abstract class UpdateApplicationPriorityRequest {
public static UpdateApplicationPriorityRequest newInstance(
ApplicationId applicationId, Priority priority) {
UpdateApplicationPriorityRequest request =
Records.newRecord(UpdateApplicationPriorityRequest.class);
request.setApplicationId(applicationId);
request.setApplicationPriority(priority);
return request;
}
/**
* Get the <code>ApplicationId</code> of the application.
*
* @return <code>ApplicationId</code> of the application
*/
public abstract ApplicationId getApplicationId();
/**
* Set the <code>ApplicationId</code> of the application.
*
* @param applicationId <code>ApplicationId</code> of the application
*/
public abstract void setApplicationId(ApplicationId applicationId);
/**
* Get the <code>Priority</code> of the application to be set.
*
* @return <code>Priority</code> of the application to be set.
*/
public abstract Priority getApplicationPriority();
/**
* Set the <code>Priority</code> of the application.
*
* @param priority <code>Priority</code> of the application
*/
public abstract void setApplicationPriority(Priority priority);
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>
* The response sent by the <code>ResourceManager</code> to the client on update
* the application priority.
* </p>
* <p>
* A response without exception means that the move has completed successfully.
* </p>
*
* @see ApplicationClientProtocol#updateApplicationPriority(UpdateApplicationPriorityRequest)
*/
@Public
@Unstable
public abstract class UpdateApplicationPriorityResponse {
public static UpdateApplicationPriorityResponse newInstance() {
UpdateApplicationPriorityResponse response =
Records.newRecord(UpdateApplicationPriorityResponse.class);
return response;
}
}

View File

@ -55,4 +55,5 @@ service ApplicationClientProtocolService {
rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
}

View File

@ -216,6 +216,14 @@ message GetClusterNodeLabelsResponseProto {
repeated NodeLabelProto nodeLabels = 1;
}
message UpdateApplicationPriorityRequestProto {
required ApplicationIdProto applicationId = 1;
required PriorityProto applicationPriority = 2;
}
message UpdateApplicationPriorityResponseProto {
}
//////////////////////////////////////////////////////
/////// client_NM_Protocol ///////////////////////////
//////////////////////////////////////////////////////

View File

@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
@ -666,4 +669,18 @@ public abstract class YarnClient extends AbstractService {
@Unstable
public abstract List<NodeLabel> getClusterNodeLabels()
throws YarnException, IOException;
/**
* <p>
* The interface used by client to set priority of an application
* </p>
* @param applicationId
* @param priority
* @throws YarnException
* @throws IOException
*/
@Public
@Unstable
public abstract void updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException;
}

View File

@ -77,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Token;
@ -820,4 +823,12 @@ public class YarnClientImpl extends YarnClient {
return rmClient.getClusterNodeLabels(
GetClusterNodeLabelsRequest.newInstance()).getNodeLabels();
}
@Override
public void updateApplicationPriority(ApplicationId applicationId,
Priority priority) throws YarnException, IOException {
UpdateApplicationPriorityRequest request =
UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
rmClient.updateApplicationPriority(request);
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@ -73,6 +74,8 @@ public class ApplicationCLI extends YarnCLI {
public static final String APPLICATION = "application";
public static final String APPLICATION_ATTEMPT = "applicationattempt";
public static final String CONTAINER = "container";
public static final String APP_ID = "appId";
public static final String UPDATE_PRIORITY = "updatePriority";
private boolean allAppStates;
@ -117,10 +120,16 @@ public class ApplicationCLI extends YarnCLI {
appStateOpt.setArgs(Option.UNLIMITED_VALUES);
appStateOpt.setArgName("States");
opts.addOption(appStateOpt);
opts.addOption(APP_ID, true, "Specify Application Id to be operated");
opts.addOption(UPDATE_PRIORITY, true,
"update priority of an application. ApplicationId can be"
+ " passed using 'appId' option.");
opts.getOption(KILL_CMD).setArgName("Application ID");
opts.getOption(MOVE_TO_QUEUE_CMD).setArgName("Application ID");
opts.getOption(QUEUE_CMD).setArgName("Queue Name");
opts.getOption(STATUS_CMD).setArgName("Application ID");
opts.getOption(APP_ID).setArgName("Application ID");
opts.getOption(UPDATE_PRIORITY).setArgName("Priority");
} else if (args.length > 0 && args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) {
title = APPLICATION_ATTEMPT;
opts.addOption(STATUS_CMD, true,
@ -238,6 +247,13 @@ public class ApplicationCLI extends YarnCLI {
} else if (cliParser.hasOption(HELP_CMD)) {
printUsage(title, opts);
return 0;
} else if (cliParser.hasOption(UPDATE_PRIORITY)) {
if (!cliParser.hasOption(APP_ID)) {
printUsage(title, opts);
return exitCode;
}
updateApplicationPriority(cliParser.getOptionValue(APP_ID),
cliParser.getOptionValue(UPDATE_PRIORITY));
} else {
syserr.println("Invalid Command Usage : ");
printUsage(title, opts);
@ -619,4 +635,17 @@ public class ApplicationCLI extends YarnCLI {
}
writer.flush();
}
/**
* Updates priority of an application with the given ID.
*/
private void updateApplicationPriority(String applicationId, String priority)
throws YarnException, IOException {
ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority));
sysout.println("Updating priority of an aplication " + applicationId);
client.updateApplicationPriority(appId, newAppPriority);
sysout.println("Successfully updated the priority of any application "
+ applicationId);
}
}

View File

@ -1409,6 +1409,31 @@ public class TestYarnCLI {
Assert.assertNotSame("should return non-zero exit code.", 0, exitCode);
}
@Test(timeout = 60000)
public void testUpdateApplicationPriority() throws Exception {
ApplicationCLI cli = createAndGetAppCLI();
ApplicationId applicationId = ApplicationId.newInstance(1234, 6);
ApplicationReport appReport =
ApplicationReport.newInstance(applicationId,
ApplicationAttemptId.newInstance(applicationId, 1), "user",
"queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
FinalApplicationStatus.UNDEFINED, null, "N/A", 0.53789f, "YARN",
null);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
appReport);
int result =
cli.run(new String[] { "application", "-appId",
applicationId.toString(),
"-updatePriority", "1" });
Assert.assertEquals(result, 0);
verify(client).updateApplicationPriority(any(ApplicationId.class),
any(Priority.class));
}
private void verifyUsageInfo(YarnCLI cli) throws Exception {
cli.setSysErrPrintStream(sysErr);
cli.run(new String[] { "application" });
@ -1458,6 +1483,7 @@ public class TestYarnCLI {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("usage: application");
pw.println(" -appId <Application ID> Specify Application Id to be operated");
pw.println(" -appStates <States> Works with -list to filter applications");
pw.println(" based on input comma-separated list of");
pw.println(" application states. The valid application");
@ -1480,6 +1506,9 @@ public class TestYarnCLI {
pw.println(" specify which queue to move an");
pw.println(" application to.");
pw.println(" -status <Application ID> Prints the status of the application.");
pw.println(" -updatePriority <Priority> update priority of an application.");
pw.println(" ApplicationId can be passed using 'appId'");
pw.println(" option.");
pw.close();
String appsHelpStr = baos.toString("UTF-8");
return appsHelpStr;

View File

@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
@ -121,6 +123,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionR
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -143,6 +147,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueue
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import com.google.protobuf.ServiceException;
@ -507,4 +513,18 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
return null;
}
}
@Override
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request) throws YarnException, IOException {
UpdateApplicationPriorityRequestProto requestProto =
((UpdateApplicationPriorityRequestPBImpl) request).getProto();
try {
return new UpdateApplicationPriorityResponsePBImpl(
proxy.updateApplicationPriority(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
@ -96,6 +97,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionR
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -137,6 +140,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionReque
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
@ -507,4 +512,21 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
throw new ServiceException(e);
}
}
@Override
public UpdateApplicationPriorityResponseProto updateApplicationPriority(
RpcController controller, UpdateApplicationPriorityRequestProto proto)
throws ServiceException {
UpdateApplicationPriorityRequestPBImpl request =
new UpdateApplicationPriorityRequestPBImpl(proto);
try {
UpdateApplicationPriorityResponse response =
real.updateApplicationPriority(request);
return ((UpdateApplicationPriorityResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProtoOrBuilder;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class UpdateApplicationPriorityRequestPBImpl extends
UpdateApplicationPriorityRequest {
UpdateApplicationPriorityRequestProto proto =
UpdateApplicationPriorityRequestProto
.getDefaultInstance();
UpdateApplicationPriorityRequestProto.Builder builder = null;
boolean viaProto = false;
private ApplicationId applicationId = null;
private Priority applicationPriority = null;
public UpdateApplicationPriorityRequestPBImpl() {
builder = UpdateApplicationPriorityRequestProto.newBuilder();
}
public UpdateApplicationPriorityRequestPBImpl(
UpdateApplicationPriorityRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public UpdateApplicationPriorityRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = UpdateApplicationPriorityRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
if (this.applicationId != null) {
builder.setApplicationId(convertToProtoFormat(this.applicationId));
}
if (this.applicationPriority != null) {
builder
.setApplicationPriority(convertToProtoFormat(this.applicationPriority));
}
}
@Override
public Priority getApplicationPriority() {
UpdateApplicationPriorityRequestProtoOrBuilder p =
viaProto ? proto : builder;
if (this.applicationPriority != null) {
return this.applicationPriority;
}
if (!p.hasApplicationPriority()) {
return null;
}
this.applicationPriority =
convertFromProtoFormat(p.getApplicationPriority());
return this.applicationPriority;
}
@Override
public void setApplicationPriority(Priority priority) {
maybeInitBuilder();
if (priority == null)
builder.clearApplicationPriority();
this.applicationPriority = priority;
}
@Override
public ApplicationId getApplicationId() {
UpdateApplicationPriorityRequestProtoOrBuilder p =
viaProto ? proto : builder;
if (this.applicationId != null) {
return applicationId;
} // Else via proto
if (!p.hasApplicationId()) {
return null;
}
applicationId = convertFromProtoFormat(p.getApplicationId());
return applicationId;
}
@Override
public void setApplicationId(ApplicationId applicationId) {
maybeInitBuilder();
if (applicationId == null)
builder.clearApplicationId();
this.applicationId = applicationId;
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl) t).getProto();
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
import com.google.protobuf.TextFormat;
public class UpdateApplicationPriorityResponsePBImpl extends
UpdateApplicationPriorityResponse {
UpdateApplicationPriorityResponseProto proto =
UpdateApplicationPriorityResponseProto.getDefaultInstance();
UpdateApplicationPriorityResponseProto.Builder builder = null;
boolean viaProto = false;
public UpdateApplicationPriorityResponsePBImpl() {
builder = UpdateApplicationPriorityResponseProto.newBuilder();
}
public UpdateApplicationPriorityResponsePBImpl(
UpdateApplicationPriorityResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public UpdateApplicationPriorityResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
@ -1304,4 +1307,74 @@ public class ClientRMService extends AbstractService implements
}
return callerUGI.getShortUserName();
}
@Override
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request) throws YarnException,
IOException {
ApplicationId applicationId = request.getApplicationId();
Priority newAppPriority = request.getApplicationPriority();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.UPDATE_APP_PRIORITY,
"UNKNOWN", "ClientRMService", "Error getting UGI", applicationId);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
"Trying to update priority of an absent application", applicationId);
throw new ApplicationNotFoundException(
"Trying to update priority o an absent application " + applicationId);
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
// Update priority only when app is tracked by the scheduler
if (!EnumSet.of(RMAppState.ACCEPTED, RMAppState.RUNNING).contains(
application.getState())) {
String msg =
"Application in " + application.getState()
+ " state cannot be update priority.";
RMAuditLogger
.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
msg);
throw new YarnException(msg);
}
try {
rmContext.getScheduler().updateApplicationPriority(newAppPriority,
applicationId);
} catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
ex.getMessage());
throw ex;
}
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
UpdateApplicationPriorityResponse response =
recordFactory
.newRecordInstance(UpdateApplicationPriorityResponse.class);
return response;
}
}

View File

@ -54,6 +54,8 @@ public class RMAuditLogger {
public static final String UNREGISTER_AM = "Unregister App Master";
public static final String ALLOC_CONTAINER = "AM Allocated Container";
public static final String RELEASE_CONTAINER = "AM Released Container";
public static final String UPDATE_APP_PRIORITY =
"Update Application Priority Request";
// Some commonly used descriptions
public static final String UNAUTHORIZED_USER = "Unauthorized user";

View File

@ -116,7 +116,7 @@ public abstract class RMStateStore extends AbstractService {
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
@ -215,8 +215,10 @@ public abstract class RMStateStore extends AbstractService {
LOG.info("Updating info for app: " + appId);
try {
store.updateApplicationStateInternal(appId, appState);
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_UPDATE_SAVED));
if (((RMStateUpdateAppEvent) event).isNotifyApplication()) {
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_UPDATE_SAVED));
}
} catch (Exception e) {
LOG.error("Error updating app: " + appId, e);
store.notifyStoreOperationFailed(e);
@ -707,8 +709,8 @@ public abstract class RMStateStore extends AbstractService {
}
public void updateApplicationStateSynchronously(
ApplicationStateData appState) {
handleStoreEvent(new RMStateUpdateAppEvent(appState));
ApplicationStateData appState, boolean notifyApp) {
handleStoreEvent(new RMStateUpdateAppEvent(appState, notifyApp));
}
public void updateFencedState() {

View File

@ -22,13 +22,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
public class RMStateUpdateAppEvent extends RMStateStoreEvent {
private final ApplicationStateData appState;
// After application state is updated in state store,
// should notify back to application or not
private boolean notifyApplication;
public RMStateUpdateAppEvent(ApplicationStateData appState) {
super(RMStateStoreEventType.UPDATE_APP);
this.appState = appState;
this.notifyApplication = true;
}
public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp) {
this(appState);
this.notifyApplication = notifyApp;
}
public ApplicationStateData getAppState() {
return appState;
}
public boolean isNotifyApplication() {
return notifyApplication;
}
}

View File

@ -785,6 +785,17 @@ public class CapacityScheduler extends
}
application.setCurrentAppAttempt(attempt);
// Update attempt priority to the latest to avoid race condition i.e
// SchedulerApplicationAttempt is created with old priority but it is not
// set to SchedulerApplication#setCurrentAppAttempt.
// Scenario would occur is
// 1. SchdulerApplicationAttempt is created with old priority.
// 2. updateApplicationPriority() updates SchedulerApplication. Since
// currentAttempt is null, it just return.
// 3. ScheduelerApplcationAttempt is set in
// SchedulerApplication#setCurrentAppAttempt.
attempt.setPriority(application.getPriority());
queue.submitApplicationAttempt(attempt, application.getUser());
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
@ -1853,7 +1864,7 @@ public class CapacityScheduler extends
}
@Override
public synchronized void updateApplicationPriority(Priority newPriority,
public void updateApplicationPriority(Priority newPriority,
ApplicationId applicationId) throws YarnException {
Priority appPriority = null;
SchedulerApplication<FiCaSchedulerApp> application = applications
@ -1879,7 +1890,8 @@ public class CapacityScheduler extends
ApplicationStateData appState = ApplicationStateData.newInstance(
rmApp.getSubmitTime(), rmApp.getStartTime(),
rmApp.getApplicationSubmissionContext(), rmApp.getUser());
rmContext.getStateStore().updateApplicationStateSynchronously(appState);
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false);
// As we use iterator over a TreeSet for OrderingPolicy, once we change
// priority then reinsert back to make order correct.

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
@ -131,6 +133,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSyst
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -1543,4 +1546,64 @@ public class TestClientRMService {
rpc.stopProxy(client, conf);
rm.close();
}
@Test(timeout = 120000)
public void testUpdateApplicationPriorityRequest() throws Exception {
int maxPriority = 10;
int appPriorty = 5;
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
maxPriority);
MockRM rm = new MockRM(conf);
rm.init(conf);
rm.start();
// Start app1 with appPriority 5
RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriorty));
Assert.assertEquals("Incorrect priority has been set to application",
appPriorty, app1.getApplicationSubmissionContext().getPriority()
.getPriority());
appPriorty = 9;
ClientRMService rmService = rm.getClientRMService();
UpdateApplicationPriorityRequest updateRequest =
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
Priority.newInstance(appPriorty));
rmService.updateApplicationPriority(updateRequest);
Assert.assertEquals("Incorrect priority has been set to application",
appPriorty, app1.getApplicationSubmissionContext().getPriority()
.getPriority());
rm.killApp(app1.getApplicationId());
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
// Update priority request for application in KILLED state
try {
rmService.updateApplicationPriority(updateRequest);
Assert.fail("Can not update priority for an application in KILLED state");
} catch (YarnException e) {
String msg =
"Application in " + app1.getState()
+ " state cannot be update priority.";
Assert.assertTrue("", msg.contains(e.getMessage()));
}
// Update priority request for invalid application id.
ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
updateRequest =
UpdateApplicationPriorityRequest.newInstance(invalidAppId,
Priority.newInstance(appPriorty));
try {
rmService.updateApplicationPriority(updateRequest);
Assert
.fail("ApplicationNotFoundException should be thrown for invalid application id");
} catch (ApplicationNotFoundException e) {
// Expected
}
rm.stop();
}
}