YARN-312. Introduced ResourceManagerAdministrationProtocol changes to support changing resources on node. Contributed by Junping Du.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1551403 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0d7b6dfa97
commit
49ad07af97
|
@ -49,6 +49,9 @@ Release 2.4.0 - UNRELEASED
|
|||
YARN-1448. AM-RM protocol changes to support container resizing (Wangda Tan
|
||||
via Sandy Ryza)
|
||||
|
||||
YARN-312. Introduced ResourceManagerAdministrationProtocol changes to support
|
||||
changing resources on node. (Junping Du via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
|
||||
|
|
|
@ -62,4 +62,10 @@ public abstract class ResourceOption {
|
|||
|
||||
protected abstract void build();
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Resource:" + getResource().toString()
|
||||
+ ", overCommitTimeout:" + getOverCommitTimeout();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,8 +22,11 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.tools.GetUserMappingsProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
|
@ -38,6 +41,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
|
||||
@Private
|
||||
@Stable
|
||||
|
@ -77,4 +82,24 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
|
|||
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
RefreshServiceAclsRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
||||
/**
|
||||
* <p>The interface used by admin to update nodes' resources to the
|
||||
* <code>ResourceManager</code> </p>.
|
||||
*
|
||||
* <p>The admin client is required to provide details such as a map from
|
||||
* {@link NodeId} to {@link ResourceOption} required to update resources on
|
||||
* a list of <code>RMNode</code> in <code>ResourceManager</code> etc.
|
||||
* via the {@link UpdateNodeResourceRequest}.</p>
|
||||
*
|
||||
* @param request request to update resource for a node in cluster.
|
||||
* @return (empty) response on accepting update.
|
||||
* @throws YarnException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request)
|
||||
throws YarnException, IOException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* <p>The request sent by admin to change a list of nodes' resource to the
|
||||
* <code>ResourceManager</code>.</p>
|
||||
*
|
||||
* <p>The request contains details such as a map from {@link NodeId} to
|
||||
* {@link ResourceOption} for updating the RMNodes' resources in
|
||||
* <code>ResourceManager</code>.
|
||||
*
|
||||
* @see ResourceManagerAdministrationProtocol#updateNodeResource(
|
||||
* UpdateNodeResourceRequest)
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract class UpdateNodeResourceRequest {
|
||||
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
public static UpdateNodeResourceRequest newInstance(
|
||||
Map<NodeId, ResourceOption> nodeResourceMap) {
|
||||
UpdateNodeResourceRequest request =
|
||||
Records.newRecord(UpdateNodeResourceRequest.class);
|
||||
request.setNodeResourceMap(nodeResourceMap);
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the map from <code>NodeId</code> to <code>ResourceOption</code>.
|
||||
* @return the map of <NodeId, ResourceOption>
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract Map<NodeId, ResourceOption> getNodeResourceMap();
|
||||
|
||||
/**
|
||||
* Set the map from <code>NodeId</code> to <code>ResourceOption</code>.
|
||||
* @param nodeResourceMap the map of <NodeId, ResourceOption>
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract void setNodeResourceMap(Map<NodeId, ResourceOption> nodeResourceMap);
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
|
||||
/**
|
||||
* <p>The response sent by the <code>ResourceManager</code> to Admin client on
|
||||
* node resource change.</p>
|
||||
*
|
||||
* <p>Currently, this is empty.</p>
|
||||
*
|
||||
* @see ResourceManagerAdministrationProtocol#updateNodeResource(
|
||||
* UpdateNodeResourceRequest)
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public interface UpdateNodeResourceResponse {
|
||||
|
||||
}
|
|
@ -38,4 +38,5 @@ service ResourceManagerAdministrationProtocolService {
|
|||
rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns (RefreshAdminAclsResponseProto);
|
||||
rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto);
|
||||
rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto);
|
||||
rpc updateNodeResource (UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto);
|
||||
}
|
||||
|
|
|
@ -68,6 +68,13 @@ message GetGroupsForUserResponseProto {
|
|||
repeated string groups = 1;
|
||||
}
|
||||
|
||||
message UpdateNodeResourceRequestProto {
|
||||
repeated NodeResourceMapProto node_resource_map = 1;
|
||||
}
|
||||
|
||||
message UpdateNodeResourceResponseProto {
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
////// RM recovery related records /////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -63,6 +63,11 @@ message ResourceOptionProto {
|
|||
optional int32 over_commit_timeout = 2;
|
||||
}
|
||||
|
||||
message NodeResourceMapProto {
|
||||
optional NodeIdProto node_id = 1;
|
||||
optional ResourceOptionProto resource_option = 2;
|
||||
}
|
||||
|
||||
message PriorityProto {
|
||||
optional int32 priority = 1;
|
||||
}
|
||||
|
|
|
@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -160,6 +162,14 @@ public class TestResourceManagerAdministrationProtocolPBClientImpl {
|
|||
assertNotNull(response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateNodeResource() throws Exception {
|
||||
UpdateNodeResourceRequest request = recordFactory
|
||||
.newRecordInstance(UpdateNodeResourceRequest.class);
|
||||
UpdateNodeResourceResponse response = client.updateNodeResource(request);
|
||||
assertNotNull(response);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshServiceAcls() throws Exception {
|
||||
RefreshServiceAclsRequest request = recordFactory
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Refre
|
|||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
|
@ -51,6 +52,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
|
||||
|
@ -63,6 +66,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUse
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
|
@ -187,4 +192,18 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||
UpdateNodeResourceRequestProto requestProto =
|
||||
((UpdateNodeResourceRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new UpdateNodeResourceResponsePBImpl(proxy.updateNodeResource(null,
|
||||
requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Refre
|
|||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
|
@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
|
||||
|
@ -56,6 +59,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUse
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -184,4 +189,19 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateNodeResourceResponseProto updateNodeResource(RpcController controller,
|
||||
UpdateNodeResourceRequestProto proto) throws ServiceException {
|
||||
UpdateNodeResourceRequestPBImpl request =
|
||||
new UpdateNodeResourceRequestPBImpl(proto);
|
||||
try {
|
||||
UpdateNodeResourceResponse response = real.updateNodeResource(request);
|
||||
return ((UpdateNodeResourceResponsePBImpl)response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceOptionPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeResourceMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
|
||||
public class UpdateNodeResourceRequestPBImpl extends UpdateNodeResourceRequest {
|
||||
|
||||
UpdateNodeResourceRequestProto proto = UpdateNodeResourceRequestProto.getDefaultInstance();
|
||||
UpdateNodeResourceRequestProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
Map<NodeId, ResourceOption> nodeResourceMap = null;
|
||||
|
||||
public UpdateNodeResourceRequestPBImpl() {
|
||||
builder = UpdateNodeResourceRequestProto.newBuilder();
|
||||
}
|
||||
|
||||
public UpdateNodeResourceRequestPBImpl(UpdateNodeResourceRequestProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<NodeId, ResourceOption> getNodeResourceMap() {
|
||||
initNodeResourceMap();
|
||||
return this.nodeResourceMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeResourceMap(Map<NodeId, ResourceOption> nodeResourceMap) {
|
||||
if (nodeResourceMap == null) {
|
||||
return;
|
||||
}
|
||||
initNodeResourceMap();
|
||||
this.nodeResourceMap.clear();
|
||||
this.nodeResourceMap.putAll(nodeResourceMap);
|
||||
}
|
||||
|
||||
public UpdateNodeResourceRequestProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.nodeResourceMap != null) {
|
||||
addNodeResourceMap();
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private void initNodeResourceMap() {
|
||||
if (this.nodeResourceMap != null) {
|
||||
return;
|
||||
}
|
||||
UpdateNodeResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<NodeResourceMapProto> list = p.getNodeResourceMapList();
|
||||
this.nodeResourceMap = new HashMap<NodeId, ResourceOption>(list
|
||||
.size());
|
||||
for (NodeResourceMapProto nodeResourceProto : list) {
|
||||
this.nodeResourceMap.put(convertFromProtoFormat(nodeResourceProto.getNodeId()),
|
||||
convertFromProtoFormat(nodeResourceProto.getResourceOption()));
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = UpdateNodeResourceRequestProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
|
||||
return ((NodeIdPBImpl)nodeId).getProto();
|
||||
}
|
||||
|
||||
private NodeId convertFromProtoFormat(NodeIdProto proto) {
|
||||
return new NodeIdPBImpl(proto);
|
||||
}
|
||||
|
||||
private ResourceOptionPBImpl convertFromProtoFormat(ResourceOptionProto c) {
|
||||
return new ResourceOptionPBImpl(c);
|
||||
}
|
||||
|
||||
private ResourceOptionProto convertToProtoFormat(ResourceOption c) {
|
||||
return ((ResourceOptionPBImpl)c).getProto();
|
||||
}
|
||||
|
||||
private void addNodeResourceMap() {
|
||||
maybeInitBuilder();
|
||||
builder.clearNodeResourceMap();
|
||||
if (nodeResourceMap == null) {
|
||||
return;
|
||||
}
|
||||
Iterable<? extends NodeResourceMapProto> values
|
||||
= new Iterable<NodeResourceMapProto>() {
|
||||
|
||||
@Override
|
||||
public Iterator<NodeResourceMapProto> iterator() {
|
||||
return new Iterator<NodeResourceMapProto>() {
|
||||
Iterator<NodeId> nodeIterator = nodeResourceMap
|
||||
.keySet().iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return nodeIterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeResourceMapProto next() {
|
||||
NodeId nodeId = nodeIterator.next();
|
||||
return NodeResourceMapProto.newBuilder().setNodeId(
|
||||
convertToProtoFormat(nodeId)).setResourceOption(
|
||||
convertToProtoFormat(nodeResourceMap.get(nodeId))).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
this.builder.addAllNodeResourceMap(values);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
|
||||
public class UpdateNodeResourceResponsePBImpl implements UpdateNodeResourceResponse {
|
||||
|
||||
UpdateNodeResourceResponseProto proto = UpdateNodeResourceResponseProto.getDefaultInstance();
|
||||
UpdateNodeResourceResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
public UpdateNodeResourceResponsePBImpl() {
|
||||
builder = UpdateNodeResourceResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public UpdateNodeResourceResponsePBImpl(
|
||||
UpdateNodeResourceResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public UpdateNodeResourceResponseProto getProto() {
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == null)
|
||||
return false;
|
||||
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getProto().toString().replaceAll("\\n", ", ")
|
||||
.replaceAll("\\s+", " ");
|
||||
}
|
||||
|
||||
}
|
|
@ -20,6 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -41,6 +46,8 @@ import org.apache.hadoop.security.authorize.AccessControlList;
|
|||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -61,6 +68,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
@ -377,4 +387,45 @@ public class AdminService extends AbstractService implements
|
|||
public String[] getGroupsForUser(String user) throws IOException {
|
||||
return UserGroupInformation.createRemoteUser(user).getGroupNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||
Map<NodeId, ResourceOption> nodeResourceMap = request.getNodeResourceMap();
|
||||
Set<NodeId> nodeIds = nodeResourceMap.keySet();
|
||||
// verify nodes are all valid first.
|
||||
// if any invalid nodes, throw exception instead of partially updating
|
||||
// valid nodes.
|
||||
for (NodeId nodeId : nodeIds) {
|
||||
RMNode node = this.rmContext.getRMNodes().get(nodeId);
|
||||
if (node == null) {
|
||||
LOG.error("Resource update get failed on all nodes due to change "
|
||||
+ "resource on an unrecognized node: " + nodeId);
|
||||
throw RPCUtil.getRemoteException(
|
||||
"Resource update get failed on all nodes due to change resource "
|
||||
+ "on an unrecognized node: " + nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
// do resource update on each node.
|
||||
// Notice: it is still possible to have invalid NodeIDs as nodes decommission
|
||||
// may happen just at the same time. This time, only log and skip absent
|
||||
// nodes without throwing any exceptions.
|
||||
for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
|
||||
ResourceOption newResourceOption = entry.getValue();
|
||||
NodeId nodeId = entry.getKey();
|
||||
RMNode node = this.rmContext.getRMNodes().get(nodeId);
|
||||
if (node == null) {
|
||||
LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
|
||||
} else {
|
||||
node.setResourceOption(newResourceOption);
|
||||
LOG.info("Update resource successfully on node(" + node.getNodeID()
|
||||
+") with resource(" + newResourceOption.toString() + ")");
|
||||
}
|
||||
}
|
||||
UpdateNodeResourceResponse response = recordFactory.newRecordInstance(
|
||||
UpdateNodeResourceResponse.class);
|
||||
return response;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue