YARN-3250. Support admin cli interface in for Application Priority. Contributed by Rohith Sharma K S

(cherry picked from commit a9c8ea71aa)
This commit is contained in:
Jian He 2015-08-27 13:25:48 -07:00
parent 1b01d163a2
commit 306c535395
17 changed files with 399 additions and 8 deletions

View File

@ -129,6 +129,9 @@ Release 2.8.0 - UNRELEASED
YARN-4014. Support user cli interface in for Application Priority.
(Rohith Sharma K S via jianhe)
YARN-3250. Support admin cli interface in for Application Priority.
(Rohith Sharma K S via jianhe)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioning
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
@ -128,4 +130,10 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException;
@Private
@Idempotent
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request) throws YarnException,
IOException;
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@Private
@Unstable
public abstract class RefreshClusterMaxPriorityRequest {
@Private
@Unstable
public static RefreshClusterMaxPriorityRequest newInstance() {
RefreshClusterMaxPriorityRequest request =
Records.newRecord(RefreshClusterMaxPriorityRequest.class);
return request;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@Private
@Unstable
public abstract class RefreshClusterMaxPriorityResponse {
@Private
@Unstable
public static RefreshClusterMaxPriorityResponse newInstance() {
RefreshClusterMaxPriorityResponse response =
Records.newRecord(RefreshClusterMaxPriorityResponse.class);
return response;
}
}

View File

@ -43,4 +43,5 @@ service ResourceManagerAdministrationProtocolService {
rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto);
rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto);
rpc checkForDecommissioningNodes(CheckForDecommissioningNodesRequestProto) returns (CheckForDecommissioningNodesResponseProto);
rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto);
}

View File

@ -107,6 +107,11 @@ message CheckForDecommissioningNodesResponseProto {
repeated NodeIdProto decommissioningNodes = 1;
}
message RefreshClusterMaxPriorityRequestProto {
}
message RefreshClusterMaxPriorityResponseProto {
}
message NodeIdToLabelsNameProto {
optional NodeIdProto nodeId = 1;
repeated string nodeLabels = 2;

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsR
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
@ -132,6 +133,9 @@ public class RMAdminCLI extends HAAdmin {
+ " (instead of NFS or HDFS), this option will only work"
+
" when the command run on the machine where RM is running."))
.put("-refreshClusterMaxPriority",
new UsageInfo("",
"Refresh cluster max priority"))
.build();
public RMAdminCLI() {
@ -381,6 +385,15 @@ public class RMAdminCLI extends HAAdmin {
adminProtocol.refreshServiceAcls(request);
return 0;
}
private int refreshClusterMaxPriority() throws IOException, YarnException {
// Refresh cluster max priority
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
RefreshClusterMaxPriorityRequest request =
recordFactory.newRecordInstance(RefreshClusterMaxPriorityRequest.class);
adminProtocol.refreshClusterMaxPriority(request);
return 0;
}
private int getGroups(String[] usernames) throws IOException {
// Get groups users belongs to
@ -676,6 +689,8 @@ public class RMAdminCLI extends HAAdmin {
exitCode = refreshAdminAcls();
} else if ("-refreshServiceAcl".equals(cmd)) {
exitCode = refreshServiceAcls();
} else if ("-refreshClusterMaxPriority".equals(cmd)) {
exitCode = refreshClusterMaxPriority();
} else if ("-getGroups".equals(cmd)) {
String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames);

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsR
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
@ -170,6 +171,14 @@ public class TestRMAdminCLI {
verify(admin).refreshAdminAcls(any(RefreshAdminAclsRequest.class));
}
@Test(timeout = 5000)
public void testRefreshClusterMaxPriority() throws Exception {
String[] args = { "-refreshClusterMaxPriority" };
assertEquals(0, rmAdminCLI.run(args));
verify(admin).refreshClusterMaxPriority(
any(RefreshClusterMaxPriorityRequest.class));
}
@Test(timeout=500)
public void testRefreshServiceAcl() throws Exception {
String[] args = { "-refreshServiceAcl" };

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Check
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
@ -50,6 +51,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioning
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
@ -72,6 +75,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommi
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshQueuesRequestPBImpl;
@ -284,4 +289,19 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
return null;
}
}
@Override
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request) throws YarnException,
IOException {
RefreshClusterMaxPriorityRequestProto requestProto =
((RefreshClusterMaxPriorityRequestPBImpl) request).getProto();
try {
return new RefreshClusterMaxPriorityResponsePBImpl(
proxy.refreshClusterMaxPriority(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGr
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
@ -52,6 +54,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsR
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
@ -66,6 +70,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommi
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshQueuesRequestPBImpl;
@ -291,4 +297,21 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou
throw new ServiceException(e);
}
}
@Override
public RefreshClusterMaxPriorityResponseProto refreshClusterMaxPriority(
RpcController arg0, RefreshClusterMaxPriorityRequestProto proto)
throws ServiceException {
RefreshClusterMaxPriorityRequest request =
new RefreshClusterMaxPriorityRequestPBImpl(proto);
try {
RefreshClusterMaxPriorityResponse response =
real.refreshClusterMaxPriority(request);
return ((RefreshClusterMaxPriorityResponsePBImpl) response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class RefreshClusterMaxPriorityRequestPBImpl
extends
RefreshClusterMaxPriorityRequest {
RefreshClusterMaxPriorityRequestProto proto =
RefreshClusterMaxPriorityRequestProto.getDefaultInstance();
RefreshClusterMaxPriorityRequestProto.Builder builder = null;
boolean viaProto = false;
public RefreshClusterMaxPriorityRequestPBImpl() {
builder = RefreshClusterMaxPriorityRequestProto.newBuilder();
}
public RefreshClusterMaxPriorityRequestPBImpl(
RefreshClusterMaxPriorityRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public RefreshClusterMaxPriorityRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityResponseProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import com.google.protobuf.TextFormat;
@Private
@Unstable
public class RefreshClusterMaxPriorityResponsePBImpl extends
RefreshClusterMaxPriorityResponse {
RefreshClusterMaxPriorityResponseProto proto =
RefreshClusterMaxPriorityResponseProto.getDefaultInstance();
RefreshClusterMaxPriorityResponseProto.Builder builder = null;
boolean viaProto = false;
public RefreshClusterMaxPriorityResponsePBImpl() {
builder = RefreshClusterMaxPriorityResponseProto.newBuilder();
}
public RefreshClusterMaxPriorityResponsePBImpl(
RefreshClusterMaxPriorityResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public RefreshClusterMaxPriorityResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioning
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
@ -613,6 +615,7 @@ public class AdminService extends CompositeService implements
false)) {
refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
}
refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest.newInstance());
} catch (Exception ex) {
throw new ServiceFailedException(ex.getMessage());
}
@ -742,4 +745,29 @@ public class AdminService extends CompositeService implements
response.setDecommissioningNodes(decommissioningNodes);
return response;
}
@Override
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request) throws YarnException,
IOException {
String argName = "refreshClusterMaxPriority";
String msg = "refresh cluster max priority";
UserGroupInformation user = checkAcls(argName);
checkRMStatus(user.getShortUserName(), argName, msg);
try {
Configuration conf =
getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
rmContext.getScheduler().setClusterMaxPriority(conf);
RMAuditLogger
.logSuccess(user.getShortUserName(), argName, "AdminService");
return recordFactory
.newRecordInstance(RefreshClusterMaxPriorityResponse.class);
} catch (YarnException e) {
throw logAndWrapException(e, user.getShortUserName(), argName, msg);
}
}
}

View File

@ -99,6 +99,8 @@ public abstract class AbstractYarnScheduler
protected RMContext rmContext;
private volatile Priority maxClusterLevelAppPriority;
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
@ -131,6 +133,7 @@ public abstract class AbstractYarnScheduler
configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
createReleaseCache();
super.serviceInit(conf);
}
@ -708,4 +711,26 @@ public abstract class AbstractYarnScheduler
// Dummy Implementation till Application Priority changes are done in
// specific scheduler.
}
public Priority getMaxClusterLevelAppPriority() {
return maxClusterLevelAppPriority;
}
private Priority getMaxPriorityFromConf(Configuration conf) {
return Priority.newInstance(conf.getInt(
YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY));
}
@Override
public synchronized void setClusterMaxPriority(Configuration conf)
throws YarnException {
try {
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
} catch (NumberFormatException e) {
throw new YarnException(e);
}
LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = "
+ maxClusterLevelAppPriority);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -328,4 +329,12 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @return list of live containers for the given attempt
*/
List<Container> getTransferredContainers(ApplicationAttemptId appAttemptId);
/**
* Set the cluster max priority
*
* @param conf
* @throws YarnException
*/
void setClusterMaxPriority(Configuration conf) throws YarnException;
}

View File

@ -220,7 +220,7 @@ public class CapacityScheduler extends
private RMNodeLabelsManager labelManager;
private SchedulerHealth schedulerHealth = new SchedulerHealth();
long lastNodeUpdateTime;
private Priority maxClusterLevelAppPriority;
/**
* EXPERT
*/
@ -316,9 +316,6 @@ public class CapacityScheduler extends
if (scheduleAsynchronously) {
asyncSchedulerThread = new AsyncScheduleThread(this);
}
maxClusterLevelAppPriority = Priority.newInstance(yarnConf.getInt(
YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY));
LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " +
@ -1859,10 +1856,6 @@ public class CapacityScheduler extends
.getPriority());
}
public Priority getMaxClusterLevelAppPriority() {
return maxClusterLevelAppPriority;
}
@Override
public void updateApplicationPriority(Priority newPriority,
ApplicationId applicationId) throws YarnException {

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
@ -867,6 +868,39 @@ public class TestRMAdminService {
rm.close();
}
@Test(timeout = 30000)
public void testAdminRefreshClusterMaxPriority() throws Exception,
YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
uploadDefaultConfiguration();
YarnConfiguration yarnConf = new YarnConfiguration();
yarnConf.set(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, "5");
uploadConfiguration(yarnConf, "yarn-site.xml");
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
Assert.assertEquals(5, cs.getMaxClusterLevelAppPriority().getPriority());
yarnConf = new YarnConfiguration();
yarnConf
.set(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, "10");
uploadConfiguration(yarnConf, "yarn-site.xml");
try {
rm.adminService
.refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest
.newInstance());
Assert.assertEquals(10, cs.getMaxClusterLevelAppPriority().getPriority());
} catch (Exception ex) {
fail("Could not refresh cluster max priority.");
}
}
private String writeConfigurationXML(Configuration conf, String confXMLName)
throws IOException {
DataOutputStream output = null;