YARN-4014. Support user cli interface in for Application Priority. Contributed by Rohith Sharma K S
(cherry picked from commit 57c7ae1aff
)
This commit is contained in:
parent
137bde0755
commit
387076894f
|
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
|
@ -466,4 +467,10 @@ public class ResourceMgrDelegate extends YarnClient {
|
|||
throws YarnException, IOException {
|
||||
return client.getClusterNodeLabels();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateApplicationPriority(ApplicationId applicationId,
|
||||
Priority priority) throws YarnException, IOException {
|
||||
client.updateApplicationPriority(applicationId, priority);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,6 +114,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
|
@ -444,6 +446,13 @@ public class TestClientRedirect {
|
|||
GetLabelsToNodesRequest request) throws YarnException, IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
||||
UpdateApplicationPriorityRequest request) throws YarnException,
|
||||
IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
class HistoryService extends AMService implements HSClientProtocol {
|
||||
|
|
|
@ -126,6 +126,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-221. NM should provide a way for AM to tell it not to aggregate logs.
|
||||
(Ming Ma via xgong)
|
||||
|
||||
YARN-4014. Support user cli interface in for Application Priority.
|
||||
(Rohith Sharma K S via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -419,4 +421,20 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol {
|
|||
@Unstable
|
||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The interface used by client to set priority of an application.
|
||||
* </p>
|
||||
* @param request to set priority of an application
|
||||
* @return an empty response
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
@Idempotent
|
||||
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
||||
UpdateApplicationPriorityRequest request) throws YarnException,
|
||||
IOException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The request sent by the client to the <code>ResourceManager</code> to set or
|
||||
* update the application priority.
|
||||
* </p>
|
||||
* <p>
|
||||
* The request includes the {@link ApplicationId} of the application and
|
||||
* {@link Priority} to be set for an application
|
||||
* </p>
|
||||
*
|
||||
* @see ApplicationClientProtocol#updateApplicationPriority(UpdateApplicationPriorityRequest)
|
||||
*/
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class UpdateApplicationPriorityRequest {
|
||||
public static UpdateApplicationPriorityRequest newInstance(
|
||||
ApplicationId applicationId, Priority priority) {
|
||||
UpdateApplicationPriorityRequest request =
|
||||
Records.newRecord(UpdateApplicationPriorityRequest.class);
|
||||
request.setApplicationId(applicationId);
|
||||
request.setApplicationPriority(priority);
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the <code>ApplicationId</code> of the application.
|
||||
*
|
||||
* @return <code>ApplicationId</code> of the application
|
||||
*/
|
||||
public abstract ApplicationId getApplicationId();
|
||||
|
||||
/**
|
||||
* Set the <code>ApplicationId</code> of the application.
|
||||
*
|
||||
* @param applicationId <code>ApplicationId</code> of the application
|
||||
*/
|
||||
public abstract void setApplicationId(ApplicationId applicationId);
|
||||
|
||||
/**
|
||||
* Get the <code>Priority</code> of the application to be set.
|
||||
*
|
||||
* @return <code>Priority</code> of the application to be set.
|
||||
*/
|
||||
public abstract Priority getApplicationPriority();
|
||||
|
||||
/**
|
||||
* Set the <code>Priority</code> of the application.
|
||||
*
|
||||
* @param priority <code>Priority</code> of the application
|
||||
*/
|
||||
public abstract void setApplicationPriority(Priority priority);
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The response sent by the <code>ResourceManager</code> to the client on update
|
||||
* the application priority.
|
||||
* </p>
|
||||
* <p>
|
||||
* A response without exception means that the move has completed successfully.
|
||||
* </p>
|
||||
*
|
||||
* @see ApplicationClientProtocol#updateApplicationPriority(UpdateApplicationPriorityRequest)
|
||||
*/
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract class UpdateApplicationPriorityResponse {
|
||||
|
||||
public static UpdateApplicationPriorityResponse newInstance() {
|
||||
UpdateApplicationPriorityResponse response =
|
||||
Records.newRecord(UpdateApplicationPriorityResponse.class);
|
||||
return response;
|
||||
}
|
||||
}
|
|
@ -55,4 +55,5 @@ service ApplicationClientProtocolService {
|
|||
rpc getNodeToLabels (GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
|
||||
rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto);
|
||||
rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
|
||||
rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto);
|
||||
}
|
||||
|
|
|
@ -216,6 +216,14 @@ message GetClusterNodeLabelsResponseProto {
|
|||
repeated NodeLabelProto nodeLabels = 1;
|
||||
}
|
||||
|
||||
message UpdateApplicationPriorityRequestProto {
|
||||
required ApplicationIdProto applicationId = 1;
|
||||
required PriorityProto applicationPriority = 2;
|
||||
}
|
||||
|
||||
message UpdateApplicationPriorityResponseProto {
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
/////// client_NM_Protocol ///////////////////////////
|
||||
//////////////////////////////////////////////////////
|
||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
|
@ -666,4 +669,18 @@ public abstract class YarnClient extends AbstractService {
|
|||
@Unstable
|
||||
public abstract List<NodeLabel> getClusterNodeLabels()
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* The interface used by client to set priority of an application
|
||||
* </p>
|
||||
* @param applicationId
|
||||
* @param priority
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract void updateApplicationPriority(ApplicationId applicationId,
|
||||
Priority priority) throws YarnException, IOException;
|
||||
}
|
||||
|
|
|
@ -77,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -89,6 +91,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
|
@ -820,4 +823,12 @@ public class YarnClientImpl extends YarnClient {
|
|||
return rmClient.getClusterNodeLabels(
|
||||
GetClusterNodeLabelsRequest.newInstance()).getNodeLabels();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateApplicationPriority(ApplicationId applicationId,
|
||||
Priority priority) throws YarnException, IOException {
|
||||
UpdateApplicationPriorityRequest request =
|
||||
UpdateApplicationPriorityRequest.newInstance(applicationId, priority);
|
||||
rmClient.updateApplicationPriority(request);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
||||
|
@ -73,6 +74,8 @@ public class ApplicationCLI extends YarnCLI {
|
|||
public static final String APPLICATION = "application";
|
||||
public static final String APPLICATION_ATTEMPT = "applicationattempt";
|
||||
public static final String CONTAINER = "container";
|
||||
public static final String APP_ID = "appId";
|
||||
public static final String UPDATE_PRIORITY = "updatePriority";
|
||||
|
||||
private boolean allAppStates;
|
||||
|
||||
|
@ -117,10 +120,16 @@ public class ApplicationCLI extends YarnCLI {
|
|||
appStateOpt.setArgs(Option.UNLIMITED_VALUES);
|
||||
appStateOpt.setArgName("States");
|
||||
opts.addOption(appStateOpt);
|
||||
opts.addOption(APP_ID, true, "Specify Application Id to be operated");
|
||||
opts.addOption(UPDATE_PRIORITY, true,
|
||||
"update priority of an application. ApplicationId can be"
|
||||
+ " passed using 'appId' option.");
|
||||
opts.getOption(KILL_CMD).setArgName("Application ID");
|
||||
opts.getOption(MOVE_TO_QUEUE_CMD).setArgName("Application ID");
|
||||
opts.getOption(QUEUE_CMD).setArgName("Queue Name");
|
||||
opts.getOption(STATUS_CMD).setArgName("Application ID");
|
||||
opts.getOption(APP_ID).setArgName("Application ID");
|
||||
opts.getOption(UPDATE_PRIORITY).setArgName("Priority");
|
||||
} else if (args.length > 0 && args[0].equalsIgnoreCase(APPLICATION_ATTEMPT)) {
|
||||
title = APPLICATION_ATTEMPT;
|
||||
opts.addOption(STATUS_CMD, true,
|
||||
|
@ -238,6 +247,13 @@ public class ApplicationCLI extends YarnCLI {
|
|||
} else if (cliParser.hasOption(HELP_CMD)) {
|
||||
printUsage(title, opts);
|
||||
return 0;
|
||||
} else if (cliParser.hasOption(UPDATE_PRIORITY)) {
|
||||
if (!cliParser.hasOption(APP_ID)) {
|
||||
printUsage(title, opts);
|
||||
return exitCode;
|
||||
}
|
||||
updateApplicationPriority(cliParser.getOptionValue(APP_ID),
|
||||
cliParser.getOptionValue(UPDATE_PRIORITY));
|
||||
} else {
|
||||
syserr.println("Invalid Command Usage : ");
|
||||
printUsage(title, opts);
|
||||
|
@ -619,4 +635,17 @@ public class ApplicationCLI extends YarnCLI {
|
|||
}
|
||||
writer.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates priority of an application with the given ID.
|
||||
*/
|
||||
private void updateApplicationPriority(String applicationId, String priority)
|
||||
throws YarnException, IOException {
|
||||
ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
|
||||
Priority newAppPriority = Priority.newInstance(Integer.parseInt(priority));
|
||||
sysout.println("Updating priority of an aplication " + applicationId);
|
||||
client.updateApplicationPriority(appId, newAppPriority);
|
||||
sysout.println("Successfully updated the priority of any application "
|
||||
+ applicationId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1409,6 +1409,31 @@ public class TestYarnCLI {
|
|||
Assert.assertNotSame("should return non-zero exit code.", 0, exitCode);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testUpdateApplicationPriority() throws Exception {
|
||||
ApplicationCLI cli = createAndGetAppCLI();
|
||||
ApplicationId applicationId = ApplicationId.newInstance(1234, 6);
|
||||
|
||||
ApplicationReport appReport =
|
||||
ApplicationReport.newInstance(applicationId,
|
||||
ApplicationAttemptId.newInstance(applicationId, 1), "user",
|
||||
"queue", "appname", "host", 124, null,
|
||||
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
|
||||
FinalApplicationStatus.UNDEFINED, null, "N/A", 0.53789f, "YARN",
|
||||
null);
|
||||
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
|
||||
appReport);
|
||||
|
||||
int result =
|
||||
cli.run(new String[] { "application", "-appId",
|
||||
applicationId.toString(),
|
||||
"-updatePriority", "1" });
|
||||
Assert.assertEquals(result, 0);
|
||||
verify(client).updateApplicationPriority(any(ApplicationId.class),
|
||||
any(Priority.class));
|
||||
|
||||
}
|
||||
|
||||
private void verifyUsageInfo(YarnCLI cli) throws Exception {
|
||||
cli.setSysErrPrintStream(sysErr);
|
||||
cli.run(new String[] { "application" });
|
||||
|
@ -1458,6 +1483,7 @@ public class TestYarnCLI {
|
|||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
PrintWriter pw = new PrintWriter(baos);
|
||||
pw.println("usage: application");
|
||||
pw.println(" -appId <Application ID> Specify Application Id to be operated");
|
||||
pw.println(" -appStates <States> Works with -list to filter applications");
|
||||
pw.println(" based on input comma-separated list of");
|
||||
pw.println(" application states. The valid application");
|
||||
|
@ -1480,6 +1506,9 @@ public class TestYarnCLI {
|
|||
pw.println(" specify which queue to move an");
|
||||
pw.println(" application to.");
|
||||
pw.println(" -status <Application ID> Prints the status of the application.");
|
||||
pw.println(" -updatePriority <Priority> update priority of an application.");
|
||||
pw.println(" ApplicationId can be passed using 'appId'");
|
||||
pw.println(" option.");
|
||||
pw.close();
|
||||
String appsHelpStr = baos.toString("UTF-8");
|
||||
return appsHelpStr;
|
||||
|
|
|
@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||
|
@ -121,6 +123,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionR
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -143,6 +147,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.MoveApplicationAcrossQueue
|
|||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationDeleteRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -507,4 +513,18 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
||||
UpdateApplicationPriorityRequest request) throws YarnException, IOException {
|
||||
UpdateApplicationPriorityRequestProto requestProto =
|
||||
((UpdateApplicationPriorityRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new UpdateApplicationPriorityResponsePBImpl(
|
||||
proxy.updateApplicationPriority(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
|
||||
|
@ -96,6 +97,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionR
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationSubmissionResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -137,6 +140,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionReque
|
|||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto;
|
||||
|
||||
|
@ -507,4 +512,21 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationPriorityResponseProto updateApplicationPriority(
|
||||
RpcController controller, UpdateApplicationPriorityRequestProto proto)
|
||||
throws ServiceException {
|
||||
UpdateApplicationPriorityRequestPBImpl request =
|
||||
new UpdateApplicationPriorityRequestPBImpl(proto);
|
||||
try {
|
||||
UpdateApplicationPriorityResponse response =
|
||||
real.updateApplicationPriority(request);
|
||||
return ((UpdateApplicationPriorityResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProtoOrBuilder;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class UpdateApplicationPriorityRequestPBImpl extends
|
||||
UpdateApplicationPriorityRequest {
|
||||
|
||||
UpdateApplicationPriorityRequestProto proto =
|
||||
UpdateApplicationPriorityRequestProto
|
||||
.getDefaultInstance();
|
||||
UpdateApplicationPriorityRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private ApplicationId applicationId = null;
|
||||
private Priority applicationPriority = null;
|
||||
|
||||
public UpdateApplicationPriorityRequestPBImpl() {
|
||||
builder = UpdateApplicationPriorityRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public UpdateApplicationPriorityRequestPBImpl(
|
||||
UpdateApplicationPriorityRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public UpdateApplicationPriorityRequestProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = UpdateApplicationPriorityRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.applicationId != null) {
|
||||
builder.setApplicationId(convertToProtoFormat(this.applicationId));
|
||||
}
|
||||
if (this.applicationPriority != null) {
|
||||
builder
|
||||
.setApplicationPriority(convertToProtoFormat(this.applicationPriority));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Priority getApplicationPriority() {
|
||||
UpdateApplicationPriorityRequestProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (this.applicationPriority != null) {
|
||||
return this.applicationPriority;
|
||||
}
|
||||
if (!p.hasApplicationPriority()) {
|
||||
return null;
|
||||
}
|
||||
this.applicationPriority =
|
||||
convertFromProtoFormat(p.getApplicationPriority());
|
||||
return this.applicationPriority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationPriority(Priority priority) {
|
||||
maybeInitBuilder();
|
||||
if (priority == null)
|
||||
builder.clearApplicationPriority();
|
||||
this.applicationPriority = priority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationId getApplicationId() {
|
||||
UpdateApplicationPriorityRequestProtoOrBuilder p =
|
||||
viaProto ? proto : builder;
|
||||
if (this.applicationId != null) {
|
||||
return applicationId;
|
||||
} // Else via proto
|
||||
if (!p.hasApplicationId()) {
|
||||
return null;
|
||||
}
|
||||
applicationId = convertFromProtoFormat(p.getApplicationId());
|
||||
return applicationId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationId(ApplicationId applicationId) {
|
||||
maybeInitBuilder();
|
||||
if (applicationId == null)
|
||||
builder.clearApplicationId();
|
||||
this.applicationId = applicationId;
|
||||
}
|
||||
|
||||
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
|
||||
return new PriorityPBImpl(p);
|
||||
}
|
||||
|
||||
private PriorityProto convertToProtoFormat(Priority t) {
|
||||
return ((PriorityPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||
return new ApplicationIdPBImpl(p);
|
||||
}
|
||||
|
||||
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||
return ((ApplicationIdPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
return false;
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
public class UpdateApplicationPriorityResponsePBImpl extends
|
||||
UpdateApplicationPriorityResponse {
|
||||
|
||||
UpdateApplicationPriorityResponseProto proto =
|
||||
UpdateApplicationPriorityResponseProto.getDefaultInstance();
|
||||
UpdateApplicationPriorityResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public UpdateApplicationPriorityResponsePBImpl() {
|
||||
builder = UpdateApplicationPriorityResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public UpdateApplicationPriorityResponsePBImpl(
|
||||
UpdateApplicationPriorityResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public UpdateApplicationPriorityResponseProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
return false;
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
}
|
|
@ -93,6 +93,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
|
@ -105,6 +107,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
|
@ -1303,4 +1306,74 @@ public class ClientRMService extends AbstractService implements
|
|||
}
|
||||
return callerUGI.getShortUserName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
||||
UpdateApplicationPriorityRequest request) throws YarnException,
|
||||
IOException {
|
||||
|
||||
ApplicationId applicationId = request.getApplicationId();
|
||||
Priority newAppPriority = request.getApplicationPriority();
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.UPDATE_APP_PRIORITY,
|
||||
"UNKNOWN", "ClientRMService", "Error getting UGI", applicationId);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
|
||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||
if (application == null) {
|
||||
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
||||
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
|
||||
"Trying to update priority of an absent application", applicationId);
|
||||
throw new ApplicationNotFoundException(
|
||||
"Trying to update priority o an absent application " + applicationId);
|
||||
}
|
||||
|
||||
if (!checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.MODIFY_APP, application)) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_PRIORITY,
|
||||
"User doesn't have permissions to "
|
||||
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
|
||||
AuditConstants.UNAUTHORIZED_USER, applicationId);
|
||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||
}
|
||||
|
||||
// Update priority only when app is tracked by the scheduler
|
||||
if (!EnumSet.of(RMAppState.ACCEPTED, RMAppState.RUNNING).contains(
|
||||
application.getState())) {
|
||||
String msg =
|
||||
"Application in " + application.getState()
|
||||
+ " state cannot be update priority.";
|
||||
RMAuditLogger
|
||||
.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
|
||||
msg);
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
|
||||
try {
|
||||
rmContext.getScheduler().updateApplicationPriority(newAppPriority,
|
||||
applicationId);
|
||||
} catch (YarnException ex) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
|
||||
ex.getMessage());
|
||||
throw ex;
|
||||
}
|
||||
|
||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
|
||||
UpdateApplicationPriorityResponse response =
|
||||
recordFactory
|
||||
.newRecordInstance(UpdateApplicationPriorityResponse.class);
|
||||
return response;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -54,6 +54,8 @@ public class RMAuditLogger {
|
|||
public static final String UNREGISTER_AM = "Unregister App Master";
|
||||
public static final String ALLOC_CONTAINER = "AM Allocated Container";
|
||||
public static final String RELEASE_CONTAINER = "AM Released Container";
|
||||
public static final String UPDATE_APP_PRIORITY =
|
||||
"Update Application Priority Request";
|
||||
|
||||
// Some commonly used descriptions
|
||||
public static final String UNAUTHORIZED_USER = "Unauthorized user";
|
||||
|
|
|
@ -215,8 +215,10 @@ public abstract class RMStateStore extends AbstractService {
|
|||
LOG.info("Updating info for app: " + appId);
|
||||
try {
|
||||
store.updateApplicationStateInternal(appId, appState);
|
||||
if (((RMStateUpdateAppEvent) event).isNotifyApplication()) {
|
||||
store.notifyApplication(new RMAppEvent(appId,
|
||||
RMAppEventType.APP_UPDATE_SAVED));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error updating app: " + appId, e);
|
||||
store.notifyStoreOperationFailed(e);
|
||||
|
@ -707,8 +709,8 @@ public abstract class RMStateStore extends AbstractService {
|
|||
}
|
||||
|
||||
public void updateApplicationStateSynchronously(
|
||||
ApplicationStateData appState) {
|
||||
handleStoreEvent(new RMStateUpdateAppEvent(appState));
|
||||
ApplicationStateData appState, boolean notifyApp) {
|
||||
handleStoreEvent(new RMStateUpdateAppEvent(appState, notifyApp));
|
||||
}
|
||||
|
||||
public void updateFencedState() {
|
||||
|
|
|
@ -22,13 +22,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
|
|||
|
||||
public class RMStateUpdateAppEvent extends RMStateStoreEvent {
|
||||
private final ApplicationStateData appState;
|
||||
// After application state is updated in state store,
|
||||
// should notify back to application or not
|
||||
private boolean notifyApplication;
|
||||
|
||||
public RMStateUpdateAppEvent(ApplicationStateData appState) {
|
||||
super(RMStateStoreEventType.UPDATE_APP);
|
||||
this.appState = appState;
|
||||
this.notifyApplication = true;
|
||||
}
|
||||
|
||||
public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp) {
|
||||
this(appState);
|
||||
this.notifyApplication = notifyApp;
|
||||
}
|
||||
|
||||
public ApplicationStateData getAppState() {
|
||||
return appState;
|
||||
}
|
||||
|
||||
public boolean isNotifyApplication() {
|
||||
return notifyApplication;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -785,6 +785,17 @@ public class CapacityScheduler extends
|
|||
}
|
||||
application.setCurrentAppAttempt(attempt);
|
||||
|
||||
// Update attempt priority to the latest to avoid race condition i.e
|
||||
// SchedulerApplicationAttempt is created with old priority but it is not
|
||||
// set to SchedulerApplication#setCurrentAppAttempt.
|
||||
// Scenario would occur is
|
||||
// 1. SchdulerApplicationAttempt is created with old priority.
|
||||
// 2. updateApplicationPriority() updates SchedulerApplication. Since
|
||||
// currentAttempt is null, it just return.
|
||||
// 3. ScheduelerApplcationAttempt is set in
|
||||
// SchedulerApplication#setCurrentAppAttempt.
|
||||
attempt.setPriority(application.getPriority());
|
||||
|
||||
queue.submitApplicationAttempt(attempt, application.getUser());
|
||||
LOG.info("Added Application Attempt " + applicationAttemptId
|
||||
+ " to scheduler from user " + application.getUser() + " in queue "
|
||||
|
@ -1853,7 +1864,7 @@ public class CapacityScheduler extends
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void updateApplicationPriority(Priority newPriority,
|
||||
public void updateApplicationPriority(Priority newPriority,
|
||||
ApplicationId applicationId) throws YarnException {
|
||||
Priority appPriority = null;
|
||||
SchedulerApplication<FiCaSchedulerApp> application = applications
|
||||
|
@ -1879,7 +1890,8 @@ public class CapacityScheduler extends
|
|||
ApplicationStateData appState = ApplicationStateData.newInstance(
|
||||
rmApp.getSubmitTime(), rmApp.getStartTime(),
|
||||
rmApp.getApplicationSubmissionContext(), rmApp.getUser());
|
||||
rmContext.getStateStore().updateApplicationStateSynchronously(appState);
|
||||
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
|
||||
false);
|
||||
|
||||
// As we use iterator over a TreeSet for OrderingPolicy, once we change
|
||||
// priority then reinsert back to make order correct.
|
||||
|
|
|
@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
|
||||
|
@ -131,6 +133,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSyst
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -1543,4 +1546,64 @@ public class TestClientRMService {
|
|||
rpc.stopProxy(client, conf);
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 120000)
|
||||
public void testUpdateApplicationPriorityRequest() throws Exception {
|
||||
int maxPriority = 10;
|
||||
int appPriorty = 5;
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
|
||||
maxPriority);
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.init(conf);
|
||||
rm.start();
|
||||
|
||||
// Start app1 with appPriority 5
|
||||
RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriorty));
|
||||
|
||||
Assert.assertEquals("Incorrect priority has been set to application",
|
||||
appPriorty, app1.getApplicationSubmissionContext().getPriority()
|
||||
.getPriority());
|
||||
|
||||
appPriorty = 9;
|
||||
ClientRMService rmService = rm.getClientRMService();
|
||||
UpdateApplicationPriorityRequest updateRequest =
|
||||
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
|
||||
Priority.newInstance(appPriorty));
|
||||
|
||||
rmService.updateApplicationPriority(updateRequest);
|
||||
|
||||
Assert.assertEquals("Incorrect priority has been set to application",
|
||||
appPriorty, app1.getApplicationSubmissionContext().getPriority()
|
||||
.getPriority());
|
||||
|
||||
rm.killApp(app1.getApplicationId());
|
||||
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||
|
||||
// Update priority request for application in KILLED state
|
||||
try {
|
||||
rmService.updateApplicationPriority(updateRequest);
|
||||
Assert.fail("Can not update priority for an application in KILLED state");
|
||||
} catch (YarnException e) {
|
||||
String msg =
|
||||
"Application in " + app1.getState()
|
||||
+ " state cannot be update priority.";
|
||||
Assert.assertTrue("", msg.contains(e.getMessage()));
|
||||
}
|
||||
|
||||
// Update priority request for invalid application id.
|
||||
ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
|
||||
updateRequest =
|
||||
UpdateApplicationPriorityRequest.newInstance(invalidAppId,
|
||||
Priority.newInstance(appPriorty));
|
||||
try {
|
||||
rmService.updateApplicationPriority(updateRequest);
|
||||
Assert
|
||||
.fail("ApplicationNotFoundException should be thrown for invalid application id");
|
||||
} catch (ApplicationNotFoundException e) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue