YARN-5799. Fix Opportunistic Allocation to set the correct value of Node Http Address. (asuresh)

This commit is contained in:
Arun Suresh 2016-10-29 02:03:57 -07:00
parent 1c8ab41e8b
commit aa3cab1eb2
14 changed files with 343 additions and 82 deletions

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
@ -58,9 +57,10 @@ public abstract class DistributedSchedulingAllocateResponse {
@Public
@Unstable
public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
public abstract void setNodesForScheduling(
List<RemoteNode> nodesForScheduling);
@Public
@Unstable
public abstract List<NodeId> getNodesForScheduling();
public abstract List<RemoteNode> getNodesForScheduling();
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
@ -99,10 +98,11 @@ public abstract class RegisterDistributedSchedulingAMResponse {
@Public
@Unstable
public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
public abstract void setNodesForScheduling(
List<RemoteNode> nodesForScheduling);
@Public
@Unstable
public abstract List<NodeId> getNodesForScheduling();
public abstract List<RemoteNode> getNodesForScheduling();
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
/**
* This class is used to encapsulate the {@link NodeId} as well as the HTTP
* address that can be used to communicate with the Node.
*/
@Private
@Unstable
public abstract class RemoteNode implements Comparable<RemoteNode> {
/**
* Create new Instance.
* @param nodeId NodeId.
* @param httpAddress Http address.
* @return RemoteNode instance.
*/
@Private
@Unstable
public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
remoteNode.setNodeId(nodeId);
remoteNode.setHttpAddress(httpAddress);
return remoteNode;
}
/**
* Get {@link NodeId}.
* @return NodeId.
*/
@Private
@Unstable
public abstract NodeId getNodeId();
/**
* Set {@link NodeId}.
* @param nodeId NodeId.
*/
@Private
@Unstable
public abstract void setNodeId(NodeId nodeId);
/**
* Get HTTP address.
* @return Http Address.
*/
@Private
@Unstable
public abstract String getHttpAddress();
/**
* Set HTTP address.
* @param httpAddress HTTP address.
*/
@Private
@Unstable
public abstract void setHttpAddress(String httpAddress);
/**
* Use the underlying {@link NodeId} comparator.
* @param other RemoteNode.
* @return Comparison.
*/
@Override
public int compareTo(RemoteNode other) {
return this.getNodeId().compareTo(other.getNodeId());
}
}

View File

@ -21,12 +21,13 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -45,7 +46,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
boolean viaProto = false;
private AllocateResponse allocateResponse;
private List<NodeId> nodesForScheduling;
private List<RemoteNode> nodesForScheduling;
public DistributedSchedulingAllocateResponsePBImpl() {
builder = YarnServerCommonServiceProtos.
@ -86,8 +87,8 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling();
Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
this.nodesForScheduling);
Iterable<YarnServerCommonServiceProtos.RemoteNodeProto> iterable =
getNodeIdProtoIterable(this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable);
}
if (this.allocateResponse != null) {
@ -123,7 +124,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
}
@Override
public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
public void setNodesForScheduling(List<RemoteNode> nodesForScheduling) {
maybeInitBuilder();
if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
if (this.nodesForScheduling != null) {
@ -137,7 +138,7 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
}
@Override
public List<NodeId> getNodesForScheduling() {
public List<RemoteNode> getNodesForScheduling() {
if (nodesForScheduling != null) {
return nodesForScheduling;
}
@ -149,24 +150,25 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
YarnServerCommonServiceProtos.
DistributedSchedulingAllocateResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
List<YarnServerCommonServiceProtos.RemoteNodeProto> list =
p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>();
if (list != null) {
for (YarnProtos.NodeIdProto t : list) {
nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
for (YarnServerCommonServiceProtos.RemoteNodeProto t : list) {
nodesForScheduling.add(new RemoteNodePBImpl(t));
}
}
}
private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
final List<NodeId> nodeList) {
private synchronized Iterable<RemoteNodeProto> getNodeIdProtoIterable(
final List<RemoteNode> nodeList) {
maybeInitBuilder();
return new Iterable<YarnProtos.NodeIdProto>() {
return new Iterable<RemoteNodeProto>() {
@Override
public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
return new Iterator<YarnProtos.NodeIdProto>() {
public synchronized Iterator<RemoteNodeProto> iterator() {
return new Iterator<RemoteNodeProto>() {
Iterator<NodeId> iter = nodeList.iterator();
Iterator<RemoteNode> iter = nodeList.iterator();
@Override
public boolean hasNext() {
@ -174,8 +176,8 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
}
@Override
public YarnProtos.NodeIdProto next() {
return ProtoUtils.convertToProtoFormat(iter.next());
public RemoteNodeProto next() {
return ((RemoteNodePBImpl)iter.next()).getProto();
}
@Override
@ -186,5 +188,4 @@ public class DistributedSchedulingAllocateResponsePBImpl extends
}
};
}
}

View File

@ -23,13 +23,15 @@ import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -52,7 +54,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
private Resource maxContainerResource;
private Resource minContainerResource;
private Resource incrContainerResource;
private List<NodeId> nodesForScheduling;
private List<RemoteNode> nodesForScheduling;
private RegisterApplicationMasterResponse registerApplicationMasterResponse;
public RegisterDistributedSchedulingAMResponsePBImpl() {
@ -95,8 +97,8 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
private synchronized void mergeLocalToBuilder() {
if (this.nodesForScheduling != null) {
builder.clearNodesForScheduling();
Iterable<YarnProtos.NodeIdProto> iterable = getNodeIdProtoIterable(
this.nodesForScheduling);
Iterable<YarnServerCommonServiceProtos.RemoteNodeProto> iterable =
getNodeIdProtoIterable(this.nodesForScheduling);
builder.addAllNodesForScheduling(iterable);
}
if (this.maxContainerResource != null) {
@ -261,7 +263,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
}
@Override
public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
public void setNodesForScheduling(List<RemoteNode> nodesForScheduling) {
maybeInitBuilder();
if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
if (this.nodesForScheduling != null) {
@ -275,7 +277,7 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
}
@Override
public List<NodeId> getNodesForScheduling() {
public List<RemoteNode> getNodesForScheduling() {
if (nodesForScheduling != null) {
return nodesForScheduling;
}
@ -287,24 +289,25 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
YarnServerCommonServiceProtos.
RegisterDistributedSchedulingAMResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
List<YarnServerCommonServiceProtos.RemoteNodeProto> list =
p.getNodesForSchedulingList();
nodesForScheduling = new ArrayList<>();
if (list != null) {
for (YarnProtos.NodeIdProto t : list) {
nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
for (YarnServerCommonServiceProtos.RemoteNodeProto t : list) {
nodesForScheduling.add(new RemoteNodePBImpl(t));
}
}
}
private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
final List<NodeId> nodeList) {
private synchronized Iterable<RemoteNodeProto> getNodeIdProtoIterable(
final List<RemoteNode> nodeList) {
maybeInitBuilder();
return new Iterable<YarnProtos.NodeIdProto>() {
return new Iterable<RemoteNodeProto>() {
@Override
public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
return new Iterator<YarnProtos.NodeIdProto>() {
public synchronized Iterator<RemoteNodeProto> iterator() {
return new Iterator<RemoteNodeProto>() {
Iterator<NodeId> iter = nodeList.iterator();
Iterator<RemoteNode> iter = nodeList.iterator();
@Override
public boolean hasNext() {
@ -312,8 +315,8 @@ public class RegisterDistributedSchedulingAMResponsePBImpl extends
}
@Override
public YarnProtos.NodeIdProto next() {
return ProtoUtils.convertToProtoFormat(iter.next());
public RemoteNodeProto next() {
return ((RemoteNodePBImpl)iter.next()).getProto();
}
@Override

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RemoteNodeProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
/**
* Implementation of {@link RemoteNode}.
*/
public class RemoteNodePBImpl extends RemoteNode {
private RemoteNodeProto proto = RemoteNodeProto.getDefaultInstance();
private RemoteNodeProto.Builder builder = null;
private boolean viaProto = false;
private NodeId nodeId = null;
public RemoteNodePBImpl() {
builder = RemoteNodeProto.newBuilder();
}
public RemoteNodePBImpl(RemoteNodeProto proto) {
this.proto = proto;
viaProto = true;
}
public RemoteNodeProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.nodeId != null
&& !((NodeIdPBImpl) nodeId).getProto().equals(
builder.getNodeId())) {
builder.setNodeId(ProtoUtils.convertToProtoFormat(this.nodeId));
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RemoteNodeProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public NodeId getNodeId() {
RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeId != null) {
return this.nodeId;
}
if (!p.hasNodeId()) {
return null;
}
this.nodeId = ProtoUtils.convertFromProtoFormat(p.getNodeId());
return this.nodeId;
}
@Override
public void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null) {
builder.clearNodeId();
}
this.nodeId = nodeId;
}
@Override
public String getHttpAddress() {
RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasHttpAddress()) {
return null;
}
return (p.getHttpAddress());
}
@Override
public void setHttpAddress(String httpAddress) {
maybeInitBuilder();
if (httpAddress == null) {
builder.clearHttpAddress();
return;
}
builder.setHttpAddress(httpAddress);
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@ -174,17 +175,14 @@ public class OpportunisticContainerAllocator {
new DominantResourceCalculator();
private final BaseContainerTokenSecretManager tokenSecretManager;
private int webpagePort;
/**
* Create a new Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
* @param webpagePort Webpage Port
*/
public OpportunisticContainerAllocator(
BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) {
BaseContainerTokenSecretManager tokenSecretManager) {
this.tokenSecretManager = tokenSecretManager;
this.webpagePort = webpagePort;
}
/**
@ -271,15 +269,15 @@ public class OpportunisticContainerAllocator {
private void allocateContainersInternal(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
Set<String> blacklist, ApplicationAttemptId id,
Map<String, NodeId> allNodes, String userName,
Map<String, RemoteNode> allNodes, String userName,
Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
throws YarnException {
int toAllocate = anyAsk.getNumContainers()
- (containers.isEmpty() ? 0 :
containers.get(anyAsk.getCapability()).size());
List<NodeId> nodesForScheduling = new ArrayList<>();
for (Entry<String, NodeId> nodeEntry : allNodes.entrySet()) {
List<RemoteNode> nodesForScheduling = new ArrayList<>();
for (Entry<String, RemoteNode> nodeEntry : allNodes.entrySet()) {
// Do not use blacklisted nodes for scheduling.
if (blacklist.contains(nodeEntry.getKey())) {
continue;
@ -295,9 +293,9 @@ public class OpportunisticContainerAllocator {
for (int numCont = 0; numCont < toAllocate; numCont++) {
nextNodeToSchedule++;
nextNodeToSchedule %= nodesForScheduling.size();
NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
RemoteNode node = nodesForScheduling.get(nextNodeToSchedule);
Container container = buildContainer(rmIdentifier, appParams, idCounter,
anyAsk, id, userName, nodeId);
anyAsk, id, userName, node);
List<Container> cList = containers.get(anyAsk.getCapability());
if (cList == null) {
cList = new ArrayList<>();
@ -313,7 +311,7 @@ public class OpportunisticContainerAllocator {
private Container buildContainer(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
ResourceRequest rr, ApplicationAttemptId id, String userName,
NodeId nodeId) throws YarnException {
RemoteNode node) throws YarnException {
ContainerId cId =
ContainerId.newContainerId(id, idCounter.generateContainerId());
@ -324,7 +322,7 @@ public class OpportunisticContainerAllocator {
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(
cId, 0, nodeId.getHost() + ":" + nodeId.getPort(), userName,
cId, 0, node.getNodeId().toString(), userName,
capability, currTime + appParams.containerTokenExpiryInterval,
tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
rr.getPriority(), currTime,
@ -332,10 +330,10 @@ public class OpportunisticContainerAllocator {
ExecutionType.OPPORTUNISTIC);
byte[] pwd =
tokenSecretManager.createPassword(containerTokenIdentifier);
Token containerToken = newContainerToken(nodeId, pwd,
Token containerToken = newContainerToken(node.getNodeId(), pwd,
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
cId, node.getNodeId(), node.getHttpAddress(),
capability, rr.getPriority(), containerToken,
containerTokenIdentifier.getExecutionType(),
rr.getAllocationRequestId());

View File

@ -23,10 +23,10 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,8 +60,8 @@ public class OpportunisticContainerContext {
private ContainerIdGenerator containerIdGenerator =
new ContainerIdGenerator();
private volatile List<NodeId> nodeList = new LinkedList<>();
private final Map<String, NodeId> nodeMap = new LinkedHashMap<>();
private volatile List<RemoteNode> nodeList = new LinkedList<>();
private final Map<String, RemoteNode> nodeMap = new LinkedHashMap<>();
private final Set<String> blacklist = new HashSet<>();
@ -89,11 +89,11 @@ public class OpportunisticContainerContext {
this.containerIdGenerator = containerIdGenerator;
}
public Map<String, NodeId> getNodeMap() {
public Map<String, RemoteNode> getNodeMap() {
return Collections.unmodifiableMap(nodeMap);
}
public synchronized void updateNodeList(List<NodeId> newNodeList) {
public synchronized void updateNodeList(List<RemoteNode> newNodeList) {
// This is an optimization for centralized placement. The
// OppContainerAllocatorAMService has a cached list of nodes which it sets
// here. The nodeMap needs to be updated only if the backing node list is
@ -101,8 +101,8 @@ public class OpportunisticContainerContext {
if (newNodeList != nodeList) {
nodeList = newNodeList;
nodeMap.clear();
for (NodeId n : nodeList) {
nodeMap.put(n.getHost(), n);
for (RemoteNode n : nodeList) {
nodeMap.put(n.getNodeId().getHost(), n);
}
}
}

View File

@ -26,6 +26,11 @@ import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
message RemoteNodeProto {
optional NodeIdProto node_id = 1;
optional string http_address = 2;
}
message RegisterDistributedSchedulingAMResponseProto {
optional RegisterApplicationMasterResponseProto register_response = 1;
optional ResourceProto max_container_resource = 2;
@ -33,12 +38,12 @@ message RegisterDistributedSchedulingAMResponseProto {
optional ResourceProto incr_container_resource = 4;
optional int32 container_token_expiry_interval = 5;
optional int64 container_id_start = 6;
repeated NodeIdProto nodes_for_scheduling = 7;
repeated RemoteNodeProto nodes_for_scheduling = 7;
}
message DistributedSchedulingAllocateResponseProto {
optional AllocateResponseProto allocate_response = 1;
repeated NodeIdProto nodes_for_scheduling = 2;
repeated RemoteNodeProto nodes_for_scheduling = 2;
}
message DistributedSchedulingAllocateRequestProto {

View File

@ -374,7 +374,7 @@ public class NodeManager extends CompositeService
((NMContext) context).setQueueableContainerAllocator(
new OpportunisticContainerAllocator(
context.getContainerTokenSecretManager(), webServer.getPort()));
context.getContainerTokenSecretManager()));
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@ -198,7 +199,7 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
setNodeList(registerResponse.getNodesForScheduling());
}
private void setNodeList(List<NodeId> nodeList) {
private void setNodeList(List<RemoteNode> nodeList) {
oppContainerContext.updateNodeList(nodeList);
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
@ -74,7 +75,8 @@ public class TestDistributedScheduler {
RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
RemoteNode.newInstance(NodeId.newInstance("a", 1), "http://a:1"),
RemoteNode.newInstance(NodeId.newInstance("b", 2), "http://b:2")));
final AtomicBoolean flipFlag = new AtomicBoolean(true);
Mockito.when(
@ -87,10 +89,16 @@ public class TestDistributedScheduler {
flipFlag.set(!flipFlag.get());
if (flipFlag.get()) {
return createAllocateResponse(Arrays.asList(
NodeId.newInstance("c", 3), NodeId.newInstance("d", 4)));
RemoteNode.newInstance(
NodeId.newInstance("c", 3), "http://c:3"),
RemoteNode.newInstance(
NodeId.newInstance("d", 4), "http://d:4")));
} else {
return createAllocateResponse(Arrays.asList(
NodeId.newInstance("d", 4), NodeId.newInstance("c", 3)));
RemoteNode.newInstance(
NodeId.newInstance("d", 4), "http://d:4"),
RemoteNode.newInstance(
NodeId.newInstance("c", 3), "http://c:3")));
}
}
});
@ -164,7 +172,7 @@ public class TestDistributedScheduler {
}
private void registerAM(DistributedScheduler distributedScheduler,
RequestInterceptor finalReqIntcptr, List<NodeId> nodeList)
RequestInterceptor finalReqIntcptr, List<RemoteNode> nodeList)
throws Exception {
RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
@ -208,7 +216,7 @@ public class TestDistributedScheduler {
};
nmContainerTokenSecretManager.setMasterKey(mKey);
OpportunisticContainerAllocator containerAllocator =
new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77);
new OpportunisticContainerAllocator(nmContainerTokenSecretManager);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM();
@ -236,7 +244,7 @@ public class TestDistributedScheduler {
}
private DistributedSchedulingAllocateResponse createAllocateResponse(
List<NodeId> nodes) {
List<RemoteNode> nodes) {
DistributedSchedulingAllocateResponse distSchedAllocateResponse =
Records.newRecord(DistributedSchedulingAllocateResponse.class);
distSchedAllocateResponse

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -74,6 +76,7 @@ import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
@ -97,7 +100,7 @@ public class OpportunisticContainerAllocatorAMService
private final int k;
private final long cacheRefreshInterval;
private List<NodeId> cachedNodeIds;
private List<RemoteNode> cachedNodes;
private long lastCacheUpdateTime;
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
@ -105,7 +108,7 @@ public class OpportunisticContainerAllocatorAMService
super(OpportunisticContainerAllocatorAMService.class.getName(),
rmContext, scheduler);
this.oppContainerAllocator = new OpportunisticContainerAllocator(
rmContext.getContainerTokenSecretManager(), 0);
rmContext.getContainerTokenSecretManager());
this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
@ -372,14 +375,29 @@ public class OpportunisticContainerAllocatorAMService
);
}
private synchronized List<NodeId> getLeastLoadedNodes() {
private synchronized List<RemoteNode> getLeastLoadedNodes() {
long currTime = System.currentTimeMillis();
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|| cachedNodeIds == null) {
cachedNodeIds = this.nodeMonitor.selectLeastLoadedNodes(this.k);
|| cachedNodes == null) {
cachedNodes = convertToRemoteNodes(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
lastCacheUpdateTime = currTime;
}
return cachedNodeIds;
return cachedNodes;
}
private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) {
ArrayList<RemoteNode> retNodes = new ArrayList<>();
for (NodeId nId : nodeIds) {
retNodes.add(convertToRemoteNode(nId));
}
return retNodes;
}
private RemoteNode convertToRemoteNode(NodeId nodeId) {
return RemoteNode.newInstance(nodeId,
((AbstractYarnScheduler)rmContext.getScheduler()).getNode(nodeId)
.getHttpAddress());
}
private Resource createMaxContainerResource() {

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
@ -190,7 +191,7 @@ public class TestOpportunisticContainerAllocatorAMService {
dsProxy.allocateForDistributedScheduling(null,
distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
"h1", dsAllocResp.getNodesForScheduling().get(0).getNodeId().getHost());
FinishApplicationMasterResponse dsfinishResp =
new FinishApplicationMasterResponsePBImpl(
@ -269,7 +270,8 @@ public class TestOpportunisticContainerAllocatorAMService {
DistributedSchedulingAllocateResponse resp = factory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
Arrays.asList(RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "http://h1:4321")));
return resp;
}
};