YARN-3250. Support admin cli interface in for Application Priority. Contributed by Rohith Sharma K S
This commit is contained in:
parent
f97a0f8c2c
commit
a9c8ea71aa
|
@ -184,6 +184,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4014. Support user cli interface in for Application Priority.
|
||||
(Rohith Sharma K S via jianhe)
|
||||
|
||||
YARN-3250. Support admin cli interface in for Application Priority.
|
||||
(Rohith Sharma K S via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioning
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
|
@ -128,4 +130,10 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
|
|||
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||
throws YarnException, IOException;
|
||||
|
||||
@Private
|
||||
@Idempotent
|
||||
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
RefreshClusterMaxPriorityRequest request) throws YarnException,
|
||||
IOException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract class RefreshClusterMaxPriorityRequest {
|
||||
@Private
|
||||
@Unstable
|
||||
public static RefreshClusterMaxPriorityRequest newInstance() {
|
||||
RefreshClusterMaxPriorityRequest request =
|
||||
Records.newRecord(RefreshClusterMaxPriorityRequest.class);
|
||||
return request;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract class RefreshClusterMaxPriorityResponse {
|
||||
@Private
|
||||
@Unstable
|
||||
public static RefreshClusterMaxPriorityResponse newInstance() {
|
||||
RefreshClusterMaxPriorityResponse response =
|
||||
Records.newRecord(RefreshClusterMaxPriorityResponse.class);
|
||||
return response;
|
||||
}
|
||||
|
||||
}
|
|
@ -43,4 +43,5 @@ service ResourceManagerAdministrationProtocolService {
|
|||
rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto);
|
||||
rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto);
|
||||
rpc checkForDecommissioningNodes(CheckForDecommissioningNodesRequestProto) returns (CheckForDecommissioningNodesResponseProto);
|
||||
rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto);
|
||||
}
|
||||
|
|
|
@ -107,6 +107,11 @@ message CheckForDecommissioningNodesResponseProto {
|
|||
repeated NodeIdProto decommissioningNodes = 1;
|
||||
}
|
||||
|
||||
message RefreshClusterMaxPriorityRequestProto {
|
||||
}
|
||||
message RefreshClusterMaxPriorityResponseProto {
|
||||
}
|
||||
|
||||
message NodeIdToLabelsNameProto {
|
||||
optional NodeIdProto nodeId = 1;
|
||||
repeated string nodeLabels = 2;
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsR
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
|
@ -132,6 +133,9 @@ public class RMAdminCLI extends HAAdmin {
|
|||
+ " (instead of NFS or HDFS), this option will only work"
|
||||
+
|
||||
" when the command run on the machine where RM is running."))
|
||||
.put("-refreshClusterMaxPriority",
|
||||
new UsageInfo("",
|
||||
"Refresh cluster max priority"))
|
||||
.build();
|
||||
|
||||
public RMAdminCLI() {
|
||||
|
@ -381,6 +385,15 @@ public class RMAdminCLI extends HAAdmin {
|
|||
adminProtocol.refreshServiceAcls(request);
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int refreshClusterMaxPriority() throws IOException, YarnException {
|
||||
// Refresh cluster max priority
|
||||
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
|
||||
RefreshClusterMaxPriorityRequest request =
|
||||
recordFactory.newRecordInstance(RefreshClusterMaxPriorityRequest.class);
|
||||
adminProtocol.refreshClusterMaxPriority(request);
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int getGroups(String[] usernames) throws IOException {
|
||||
// Get groups users belongs to
|
||||
|
@ -676,6 +689,8 @@ public class RMAdminCLI extends HAAdmin {
|
|||
exitCode = refreshAdminAcls();
|
||||
} else if ("-refreshServiceAcl".equals(cmd)) {
|
||||
exitCode = refreshServiceAcls();
|
||||
} else if ("-refreshClusterMaxPriority".equals(cmd)) {
|
||||
exitCode = refreshClusterMaxPriority();
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
String[] usernames = Arrays.copyOfRange(args, i, args.length);
|
||||
exitCode = getGroups(usernames);
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsR
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
|
@ -170,6 +171,14 @@ public class TestRMAdminCLI {
|
|||
verify(admin).refreshAdminAcls(any(RefreshAdminAclsRequest.class));
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testRefreshClusterMaxPriority() throws Exception {
|
||||
String[] args = { "-refreshClusterMaxPriority" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
verify(admin).refreshClusterMaxPriority(
|
||||
any(RefreshClusterMaxPriorityRequest.class));
|
||||
}
|
||||
|
||||
@Test(timeout=500)
|
||||
public void testRefreshServiceAcl() throws Exception {
|
||||
String[] args = { "-refreshServiceAcl" };
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Check
|
|||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
|
||||
|
@ -50,6 +51,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioning
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
|
@ -72,6 +75,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommi
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshQueuesRequestPBImpl;
|
||||
|
@ -284,4 +289,19 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
RefreshClusterMaxPriorityRequest request) throws YarnException,
|
||||
IOException {
|
||||
RefreshClusterMaxPriorityRequestProto requestProto =
|
||||
((RefreshClusterMaxPriorityRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new RefreshClusterMaxPriorityResponsePBImpl(
|
||||
proxy.refreshClusterMaxPriority(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGr
|
|||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
|
||||
|
@ -52,6 +54,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsR
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
|
@ -66,6 +70,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommi
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshClusterMaxPriorityResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshQueuesRequestPBImpl;
|
||||
|
@ -291,4 +297,21 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshClusterMaxPriorityResponseProto refreshClusterMaxPriority(
|
||||
RpcController arg0, RefreshClusterMaxPriorityRequestProto proto)
|
||||
throws ServiceException {
|
||||
RefreshClusterMaxPriorityRequest request =
|
||||
new RefreshClusterMaxPriorityRequestPBImpl(proto);
|
||||
try {
|
||||
RefreshClusterMaxPriorityResponse response =
|
||||
real.refreshClusterMaxPriority(request);
|
||||
return ((RefreshClusterMaxPriorityResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class RefreshClusterMaxPriorityRequestPBImpl
|
||||
extends
|
||||
RefreshClusterMaxPriorityRequest {
|
||||
|
||||
RefreshClusterMaxPriorityRequestProto proto =
|
||||
RefreshClusterMaxPriorityRequestProto.getDefaultInstance();
|
||||
RefreshClusterMaxPriorityRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public RefreshClusterMaxPriorityRequestPBImpl() {
|
||||
builder = RefreshClusterMaxPriorityRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public RefreshClusterMaxPriorityRequestPBImpl(
|
||||
RefreshClusterMaxPriorityRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public RefreshClusterMaxPriorityRequestProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
return false;
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityResponseProto;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class RefreshClusterMaxPriorityResponsePBImpl extends
|
||||
RefreshClusterMaxPriorityResponse {
|
||||
|
||||
RefreshClusterMaxPriorityResponseProto proto =
|
||||
RefreshClusterMaxPriorityResponseProto.getDefaultInstance();
|
||||
RefreshClusterMaxPriorityResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public RefreshClusterMaxPriorityResponsePBImpl() {
|
||||
builder = RefreshClusterMaxPriorityResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public RefreshClusterMaxPriorityResponsePBImpl(
|
||||
RefreshClusterMaxPriorityResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public RefreshClusterMaxPriorityResponseProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
return false;
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
}
|
|
@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioning
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
|
@ -613,6 +615,7 @@ public class AdminService extends CompositeService implements
|
|||
false)) {
|
||||
refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
|
||||
}
|
||||
refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest.newInstance());
|
||||
} catch (Exception ex) {
|
||||
throw new ServiceFailedException(ex.getMessage());
|
||||
}
|
||||
|
@ -742,4 +745,29 @@ public class AdminService extends CompositeService implements
|
|||
response.setDecommissioningNodes(decommissioningNodes);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
RefreshClusterMaxPriorityRequest request) throws YarnException,
|
||||
IOException {
|
||||
String argName = "refreshClusterMaxPriority";
|
||||
String msg = "refresh cluster max priority";
|
||||
UserGroupInformation user = checkAcls(argName);
|
||||
|
||||
checkRMStatus(user.getShortUserName(), argName, msg);
|
||||
try {
|
||||
Configuration conf =
|
||||
getConfiguration(new Configuration(false),
|
||||
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
|
||||
|
||||
rmContext.getScheduler().setClusterMaxPriority(conf);
|
||||
|
||||
RMAuditLogger
|
||||
.logSuccess(user.getShortUserName(), argName, "AdminService");
|
||||
return recordFactory
|
||||
.newRecordInstance(RefreshClusterMaxPriorityResponse.class);
|
||||
} catch (YarnException e) {
|
||||
throw logAndWrapException(e, user.getShortUserName(), argName, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,6 +99,8 @@ public abstract class AbstractYarnScheduler
|
|||
|
||||
protected RMContext rmContext;
|
||||
|
||||
private volatile Priority maxClusterLevelAppPriority;
|
||||
|
||||
/*
|
||||
* All schedulers which are inheriting AbstractYarnScheduler should use
|
||||
* concurrent version of 'applications' map.
|
||||
|
@ -131,6 +133,7 @@ public abstract class AbstractYarnScheduler
|
|||
configuredMaximumAllocationWaitTime =
|
||||
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
|
||||
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
|
||||
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
|
||||
createReleaseCache();
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
@ -708,4 +711,26 @@ public abstract class AbstractYarnScheduler
|
|||
// Dummy Implementation till Application Priority changes are done in
|
||||
// specific scheduler.
|
||||
}
|
||||
|
||||
public Priority getMaxClusterLevelAppPriority() {
|
||||
return maxClusterLevelAppPriority;
|
||||
}
|
||||
|
||||
private Priority getMaxPriorityFromConf(Configuration conf) {
|
||||
return Priority.newInstance(conf.getInt(
|
||||
YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
|
||||
YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setClusterMaxPriority(Configuration conf)
|
||||
throws YarnException {
|
||||
try {
|
||||
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new YarnException(e);
|
||||
}
|
||||
LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = "
|
||||
+ maxClusterLevelAppPriority);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
@ -328,4 +329,12 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
* @return list of live containers for the given attempt
|
||||
*/
|
||||
List<Container> getTransferredContainers(ApplicationAttemptId appAttemptId);
|
||||
|
||||
/**
|
||||
* Set the cluster max priority
|
||||
*
|
||||
* @param conf
|
||||
* @throws YarnException
|
||||
*/
|
||||
void setClusterMaxPriority(Configuration conf) throws YarnException;
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ public class CapacityScheduler extends
|
|||
private RMNodeLabelsManager labelManager;
|
||||
private SchedulerHealth schedulerHealth = new SchedulerHealth();
|
||||
long lastNodeUpdateTime;
|
||||
private Priority maxClusterLevelAppPriority;
|
||||
|
||||
/**
|
||||
* EXPERT
|
||||
*/
|
||||
|
@ -316,9 +316,6 @@ public class CapacityScheduler extends
|
|||
if (scheduleAsynchronously) {
|
||||
asyncSchedulerThread = new AsyncScheduleThread(this);
|
||||
}
|
||||
maxClusterLevelAppPriority = Priority.newInstance(yarnConf.getInt(
|
||||
YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
|
||||
YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY));
|
||||
|
||||
LOG.info("Initialized CapacityScheduler with " +
|
||||
"calculator=" + getResourceCalculator().getClass() + ", " +
|
||||
|
@ -1859,10 +1856,6 @@ public class CapacityScheduler extends
|
|||
.getPriority());
|
||||
}
|
||||
|
||||
public Priority getMaxClusterLevelAppPriority() {
|
||||
return maxClusterLevelAppPriority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateApplicationPriority(Priority newPriority,
|
||||
ApplicationId applicationId) throws YarnException {
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
|
@ -867,6 +868,39 @@ public class TestRMAdminService {
|
|||
rm.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testAdminRefreshClusterMaxPriority() throws Exception,
|
||||
YarnException {
|
||||
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
|
||||
|
||||
uploadDefaultConfiguration();
|
||||
YarnConfiguration yarnConf = new YarnConfiguration();
|
||||
yarnConf.set(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, "5");
|
||||
uploadConfiguration(yarnConf, "yarn-site.xml");
|
||||
|
||||
rm = new MockRM(configuration);
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
|
||||
Assert.assertEquals(5, cs.getMaxClusterLevelAppPriority().getPriority());
|
||||
|
||||
yarnConf = new YarnConfiguration();
|
||||
yarnConf
|
||||
.set(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, "10");
|
||||
uploadConfiguration(yarnConf, "yarn-site.xml");
|
||||
|
||||
try {
|
||||
rm.adminService
|
||||
.refreshClusterMaxPriority(RefreshClusterMaxPriorityRequest
|
||||
.newInstance());
|
||||
Assert.assertEquals(10, cs.getMaxClusterLevelAppPriority().getPriority());
|
||||
} catch (Exception ex) {
|
||||
fail("Could not refresh cluster max priority.");
|
||||
}
|
||||
}
|
||||
|
||||
private String writeConfigurationXML(Configuration conf, String confXMLName)
|
||||
throws IOException {
|
||||
DataOutputStream output = null;
|
||||
|
|
Loading…
Reference in New Issue