YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers. (asuresh)

This commit is contained in:
Arun Suresh 2016-04-26 20:12:12 -07:00
parent 68b4564e78
commit 341888a0aa
27 changed files with 1034 additions and 132 deletions

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -190,6 +191,10 @@ public class NodeInfo {
return null;
}
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return null;

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -179,6 +180,10 @@ public class RMNodeWrapper implements RMNode {
return Collections.EMPTY_LIST;
}
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return node.getAggregatedContainersUtilization();

View File

@ -338,6 +338,23 @@ public class YarnConfiguration extends Configuration {
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
600000;
/** K least loaded nodes to be provided to the LocalScheduler of a
* NodeManager for Distributed Scheduling */
public static final String DIST_SCHEDULING_TOP_K =
YARN_PREFIX + "distributed-scheduling.top-k";
public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
/** Frequency for computing Top K Best Nodes */
public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
/** Comparator for determining Node Load for Distributed Scheduling */
public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
YARN_PREFIX + "distributed-scheduling.top-k-comparator";
public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
"QUEUE_LENGTH";
/**
* Enable/disable intermediate-data encryption at YARN level. For now, this
* only is used by the FileSystemRMStateStore to setup right file-system

View File

@ -114,6 +114,31 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
// Ignore Distributed Scheduling Related Configurations.
// Since it is still a "work in progress" feature
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_ENABLED);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR);
// Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.event;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* This is a specialized EventHandler to be used by Services that are expected
* handle a large number of events efficiently by ensuring that the caller
* thread is not blocked. Events are immediately stored in a BlockingQueue and
* a separate dedicated Thread consumes events from the queue and handles
* appropriately
* @param <T> Type of Event
*/
public class EventDispatcher<T extends Event> extends
AbstractService implements EventHandler<T> {
private final EventHandler<T> handler;
private final BlockingQueue<T> eventQueue =
new LinkedBlockingDeque<>();
private final Thread eventProcessor;
private volatile boolean stopped = false;
private boolean shouldExitOnError = false;
private static final Log LOG = LogFactory.getLog(EventDispatcher.class);
private final class EventProcessor implements Runnable {
@Override
public void run() {
T event;
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return; // TODO: Kill RM.
}
try {
handler.handle(event);
} catch (Throwable t) {
// An error occurred, but we are shutting down anyway.
// If it was an InterruptedException, the very act of
// shutdown could have caused it and is probably harmless.
if (stopped) {
LOG.warn("Exception during shutdown: ", t);
break;
}
LOG.fatal("Error in handling event type " + event.getType()
+ " to the Event Dispatcher", t);
if (shouldExitOnError
&& !ShutdownHookManager.get().isShutdownInProgress()) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
}
}
public EventDispatcher(EventHandler<T> handler, String name) {
super(name);
this.handler = handler;
this.eventProcessor = new Thread(new EventProcessor());
this.eventProcessor.setName(getName() + ":Event Processor");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.shouldExitOnError =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
this.eventProcessor.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
this.stopped = true;
this.eventProcessor.interrupt();
try {
this.eventProcessor.join();
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
super.serviceStop();
}
@Override
public void handle(T event) {
try {
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of " + getName() + " event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity on " + getName() + "" +
"event queue: " + remCapacity);
}
this.eventQueue.put(event);
} catch (InterruptedException e) {
LOG.info("Interrupted. Trying to exit gracefully.");
}
}
}

View File

@ -122,4 +122,13 @@ public abstract class NodeStatus {
@Unstable
public abstract void setIncreasedContainers(
List<Container> increasedContainers);
@Private
@Unstable
public abstract QueuedContainersStatus getQueuedContainersStatus();
@Private
@Unstable
public abstract void setQueuedContainersStatus(
QueuedContainersStatus queuedContainersStatus);
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>
* <code>QueuedContainersStatus</code> captures information pertaining to the
* state of execution of the Queueable containers within a node.
* </p>
*/
@Private
@Evolving
public abstract class QueuedContainersStatus {
public static QueuedContainersStatus newInstance() {
return Records.newRecord(QueuedContainersStatus.class);
}
public abstract int getEstimatedQueueWaitTime();
public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
public abstract int getWaitQueueLength();
public abstract void setWaitQueueLength(int queueWaitTime);
}

View File

@ -33,14 +33,17 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -400,6 +403,27 @@ public class NodeStatusPBImpl extends NodeStatus {
this.increasedContainers = increasedContainers;
}
@Override
public QueuedContainersStatus getQueuedContainersStatus() {
NodeStatusProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
if (!p.hasQueuedContainerStatus()) {
return null;
}
return convertFromProtoFormat(p.getQueuedContainerStatus());
}
@Override
public void setQueuedContainersStatus(QueuedContainersStatus queuedContainersStatus) {
maybeInitBuilder();
if (queuedContainersStatus == null) {
this.builder.clearQueuedContainerStatus();
return;
}
this.builder.setQueuedContainerStatus(
convertToProtoFormat(queuedContainersStatus));
}
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
@ -433,15 +457,25 @@ public class NodeStatusPBImpl extends NodeStatus {
return ((ApplicationIdPBImpl)c).getProto();
}
private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
private YarnProtos.ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
return ((ResourceUtilizationPBImpl) r).getProto();
}
private ResourceUtilizationPBImpl convertFromProtoFormat(
ResourceUtilizationProto p) {
YarnProtos.ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p);
}
private YarnServerCommonProtos.QueuedContainersStatusProto convertToProtoFormat(
QueuedContainersStatus r) {
return ((QueuedContainersStatusPBImpl) r).getProto();
}
private QueuedContainersStatus convertFromProtoFormat(
YarnServerCommonProtos.QueuedContainersStatusProto p) {
return new QueuedContainersStatusPBImpl(p);
}
private ContainerPBImpl convertFromProtoFormat(
ContainerProto c) {
return new ContainerPBImpl(c);

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records.impl.pb;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
public class QueuedContainersStatusPBImpl extends QueuedContainersStatus {
private YarnServerCommonProtos.QueuedContainersStatusProto proto =
YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance();
private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder = null;
private boolean viaProto = false;
public QueuedContainersStatusPBImpl() {
builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder();
}
public QueuedContainersStatusPBImpl(YarnServerCommonProtos
.QueuedContainersStatusProto proto) {
this.proto = proto;
viaProto = true;
}
public YarnServerCommonProtos.QueuedContainersStatusProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder =
YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getEstimatedQueueWaitTime() {
YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getEstimatedQueueWaitTime();
}
@Override
public void setEstimatedQueueWaitTime(int queueWaitTime) {
maybeInitBuilder();
builder.setEstimatedQueueWaitTime(queueWaitTime);
}
@Override
public int getWaitQueueLength() {
YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getWaitQueueLength();
}
@Override
public void setWaitQueueLength(int waitQueueLength) {
maybeInitBuilder();
builder.setWaitQueueLength(waitQueueLength);
}
}

View File

@ -39,6 +39,12 @@ message NodeStatusProto {
optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8;
optional QueuedContainersStatusProto queued_container_status = 9;
}
message QueuedContainersStatusProto {
optional int32 estimated_queue_wait_time = 1;
optional int32 wait_queue_length = 2;
}
message MasterKeyProto {

View File

@ -39,8 +39,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -131,4 +137,28 @@ public class TestProtocolRecords {
((NodeHeartbeatResponsePBImpl) record).getProto());
Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
}
@Test
public void testNodeHeartBeatRequest() throws IOException {
NodeHeartbeatRequest record =
Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus nodeStatus =
Records.newRecord(NodeStatus.class);
QueuedContainersStatus queuedContainersStatus = Records.newRecord
(QueuedContainersStatus.class);
queuedContainersStatus.setEstimatedQueueWaitTime(123);
queuedContainersStatus.setWaitQueueLength(321);
nodeStatus.setQueuedContainersStatus(queuedContainersStatus);
record.setNodeStatus(nodeStatus);
NodeHeartbeatRequestPBImpl pb = new
NodeHeartbeatRequestPBImpl(
((NodeHeartbeatRequestPBImpl) record).getProto());
Assert.assertEquals(123,
pb.getNodeStatus()
.getQueuedContainersStatus().getEstimatedQueueWaitTime());
Assert.assertEquals(321,
pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
}
}

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@ -449,9 +450,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
createKeepAliveApplicationList(), nodeHealthStatus,
containersUtilization, nodeUtilization, increasedContainers);
nodeStatus.setQueuedContainersStatus(getQueuedContainerStatus());
return nodeStatus;
}
private QueuedContainersStatus getQueuedContainerStatus() {
QueuedContainersStatus status = QueuedContainersStatus.newInstance();
status.setWaitQueueLength(
this.context.getQueuingContext().getQueuedContainers().size());
return status;
}
/**
* Get the aggregated utilization of the containers in this node.
* @return Resource utilization of all the containers.

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import java.util.List;
public interface ClusterMonitor {
void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
void removeNode(RMNode removedRMNode);
void nodeUpdate(RMNode rmNode);
void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
}

View File

@ -18,10 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
@ -40,20 +44,62 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security
.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
.TopKNodeSelector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* The DistributedSchedulingService is started instead of the
* ApplicationMasterService if DistributedScheduling is enabled for the YARN
* cluster.
* It extends the functionality of the ApplicationMasterService by servicing
* clients (AMs and AMRMProxy request interceptors) that understand the
* DistributedSchedulingProtocol.
*/
public class DistributedSchedulingService extends ApplicationMasterService
implements DistributedSchedulerProtocol {
implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
private static final Log LOG =
LogFactory.getLog(DistributedSchedulingService.class);
private final TopKNodeSelector clusterMonitor;
private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
new ConcurrentHashMap<>();
public DistributedSchedulingService(RMContext rmContext,
YarnScheduler scheduler) {
super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
int k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.DIST_SCHEDULING_TOP_K,
YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
TopKNodeSelector.TopKComparator comparator =
TopKNodeSelector.TopKComparator.valueOf(
rmContext.getYarnConfiguration().get(
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
TopKNodeSelector topKSelector =
new TopKNodeSelector(k, topKComputationInterval, comparator);
this.clusterMonitor = topKSelector;
}
@Override
@ -63,8 +109,9 @@ public class DistributedSchedulingService extends ApplicationMasterService
addr, serverConf, secretManager,
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
// To support application running no NMs that DO NOT support
// Dist Scheduling...
// To support application running on NMs that DO NOT support
// Dist Scheduling... The server multiplexes both the
// ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
ApplicationMasterProtocolPB.class,
ApplicationMasterProtocolService.newReflectiveBlockingService(
@ -141,10 +188,8 @@ public class DistributedSchedulingService extends ApplicationMasterService
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
// Set nodes to be used for scheduling
// TODO: The actual computation of the list will happen in YARN-4412
// TODO: Till then, send the complete list
dsResp.setNodesForScheduling(
new ArrayList<>(this.rmContext.getRMNodes().keySet()));
new ArrayList<>(this.clusterMonitor.selectNodes()));
return dsResp;
}
@ -156,7 +201,95 @@ public class DistributedSchedulingService extends ApplicationMasterService
(DistSchedAllocateResponse.class);
dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling(
new ArrayList<>(this.rmContext.getRMNodes().keySet()));
new ArrayList<>(this.clusterMonitor.selectNodes()));
return dsResp;
}
private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
String rackName, NodeId nodeId) {
if (rackName != null) {
mapping.putIfAbsent(rackName, new HashSet<NodeId>());
Set<NodeId> nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
nodeIds.add(nodeId);
}
}
}
private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
String rackName, NodeId nodeId) {
if (rackName != null) {
Set<NodeId> nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
nodeIds.remove(nodeId);
}
}
}
@Override
public void handle(SchedulerEvent event) {
switch (event.getType()) {
case NODE_ADDED:
if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode());
addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
nodeAddedEvent.getAddedRMNode().getNodeID());
addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
nodeAddedEvent.getAddedRMNode().getNodeID());
break;
case NODE_REMOVED:
if (!(event instanceof NodeRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeRemovedSchedulerEvent nodeRemovedEvent =
(NodeRemovedSchedulerEvent)event;
clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
removeFromMapping(rackToNode,
nodeRemovedEvent.getRemovedRMNode().getRackName(),
nodeRemovedEvent.getRemovedRMNode().getNodeID());
removeFromMapping(hostToNode,
nodeRemovedEvent.getRemovedRMNode().getHostName(),
nodeRemovedEvent.getRemovedRMNode().getNodeID());
break;
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
case NODE_RESOURCE_UPDATE:
if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent)event;
clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
break;
// <-- IGNORED EVENTS : START -->
case APP_ADDED:
break;
case APP_REMOVED:
break;
case APP_ATTEMPT_ADDED:
break;
case APP_ATTEMPT_REMOVED:
break;
case CONTAINER_EXPIRED:
break;
case NODE_LABELS_UPDATE:
break;
// <-- IGNORED EVENTS : END -->
default:
LOG.error("Unknown event arrived at DistributedSchedulingService: "
+ event.toString());
}
}
}

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ExitUtil;
@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@ -118,8 +118,6 @@ import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* The ResourceManager is the main class that is a set of components.
@ -370,7 +368,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler);
return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
}
protected Dispatcher createDispatcher() {
@ -725,108 +723,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
}
@Private
public static class SchedulerEventDispatcher extends AbstractService
implements EventHandler<SchedulerEvent> {
private final ResourceScheduler scheduler;
private final BlockingQueue<SchedulerEvent> eventQueue =
new LinkedBlockingQueue<SchedulerEvent>();
private volatile int lastEventQueueSizeLogged = 0;
private final Thread eventProcessor;
private volatile boolean stopped = false;
private boolean shouldExitOnError = false;
public SchedulerEventDispatcher(ResourceScheduler scheduler) {
super(SchedulerEventDispatcher.class.getName());
this.scheduler = scheduler;
this.eventProcessor = new Thread(new EventProcessor());
this.eventProcessor.setName("ResourceManager Event Processor");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.shouldExitOnError =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
this.eventProcessor.start();
super.serviceStart();
}
private final class EventProcessor implements Runnable {
@Override
public void run() {
SchedulerEvent event;
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return; // TODO: Kill RM.
}
try {
scheduler.handle(event);
} catch (Throwable t) {
// An error occurred, but we are shutting down anyway.
// If it was an InterruptedException, the very act of
// shutdown could have caused it and is probably harmless.
if (stopped) {
LOG.warn("Exception during shutdown: ", t);
break;
}
LOG.fatal("Error in handling event type " + event.getType()
+ " to the scheduler", t);
if (shouldExitOnError
&& !ShutdownHookManager.get().isShutdownInProgress()) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
}
}
@Override
protected void serviceStop() throws Exception {
this.stopped = true;
this.eventProcessor.interrupt();
try {
this.eventProcessor.join();
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
super.serviceStop();
}
@Override
public void handle(SchedulerEvent event) {
try {
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of scheduler event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity on scheduler event queue: "
+ remCapacity);
}
this.eventQueue.put(event);
} catch (InterruptedException e) {
LOG.info("Interrupted. Trying to exit gracefully.");
}
}
}
@Private
public static class RMFatalEventDispatcher
implements EventHandler<RMFatalEvent> {
@ -1234,7 +1130,19 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
return new DistributedSchedulingService(this.rmContext, scheduler);
DistributedSchedulingService distributedSchedulingService = new
DistributedSchedulingService(this.rmContext, scheduler);
EventDispatcher distSchedulerEventDispatcher =
new EventDispatcher(distributedSchedulingService,
DistributedSchedulingService.class.getName());
// Add an event dispoatcher for the DistributedSchedulingService
// to handle node updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses,
// we register interest for it..
addService(distSchedulerEventDispatcher);
rmDispatcher.register(SchedulerEventType.class,
distSchedulerEventDispatcher);
return distributedSchedulingService;
}
return new ApplicationMasterService(this.rmContext, scheduler);
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
/**
* Node managers information on available resources
@ -168,4 +169,7 @@ public interface RMNode {
NodeHeartbeatResponse response);
public List<Container> pullNewlyIncreasedContainers();
public QueuedContainersStatus getQueuedContainersStatus();
}

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
@ -125,6 +126,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Resource utilization for the node. */
private ResourceUtilization nodeUtilization;
/* Container Queue Information for the node.. Used by Distributed Scheduler */
private QueuedContainersStatus queuedContainersStatus;
private final ContainerAllocationExpirer containerAllocationExpirer;
/* set of containers that have just launched */
private final Set<ContainerId> launchedContainers =
@ -1121,7 +1125,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo());
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
rmNode, statusEvent);
NodeState initialState = rmNode.getState();
@ -1383,4 +1387,25 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public Resource getOriginalTotalCapability() {
return this.originalTotalCapability;
}
}
public QueuedContainersStatus getQueuedContainersStatus() {
this.readLock.lock();
try {
return this.queuedContainersStatus;
} finally {
this.readLock.unlock();
}
}
public void setQueuedContainersStatus(QueuedContainersStatus
queuedContainersStatus) {
this.writeLock.lock();
try {
this.queuedContainersStatus = queuedContainersStatus;
} finally {
this.writeLock.unlock();
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -79,6 +80,10 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.logAggregationReportsForApps;
}
public QueuedContainersStatus getContainerQueueInfo() {
return this.nodeStatus.getQueuedContainersStatus();
}
public void setLogAggregationReportsForApps(
List<LogAggregationReport> logAggregationReportsForApps) {
this.logAggregationReportsForApps = logAggregationReportsForApps;
@ -89,4 +94,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.nodeStatus.getIncreasedContainers() == null ?
Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers();
}
}

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TopKNodeSelector implements ClusterMonitor {
final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
public enum TopKComparator implements Comparator<ClusterNode> {
WAIT_TIME,
QUEUE_LENGTH;
@Override
public int compare(ClusterNode o1, ClusterNode o2) {
if (getQuant(o1) == getQuant(o2)) {
return o1.timestamp < o2.timestamp ? +1 : -1;
}
return getQuant(o1) > getQuant(o2) ? +1 : -1;
}
private int getQuant(ClusterNode c) {
return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
}
}
static class ClusterNode {
int queueTime = -1;
int waitQueueLength = 0;
double timestamp;
final NodeId nodeId;
public ClusterNode(NodeId nodeId) {
this.nodeId = nodeId;
updateTimestamp();
}
public ClusterNode setQueueTime(int queueTime) {
this.queueTime = queueTime;
return this;
}
public ClusterNode setWaitQueueLength(int queueLength) {
this.waitQueueLength = queueLength;
return this;
}
public ClusterNode updateTimestamp() {
this.timestamp = System.currentTimeMillis();
return this;
}
}
private final int k;
private final List<NodeId> topKNodes;
private final ScheduledExecutorService scheduledExecutor;
private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
private final Comparator<ClusterNode> comparator;
Runnable computeTask = new Runnable() {
@Override
public void run() {
synchronized (topKNodes) {
topKNodes.clear();
topKNodes.addAll(computeTopKNodes());
}
}
};
@VisibleForTesting
TopKNodeSelector(int k, TopKComparator comparator) {
this.k = k;
this.topKNodes = new ArrayList<>();
this.comparator = comparator;
this.scheduledExecutor = null;
}
public TopKNodeSelector(int k, long nodeComputationInterval,
TopKComparator comparator) {
this.k = k;
this.topKNodes = new ArrayList<>();
this.scheduledExecutor = Executors.newScheduledThreadPool(1);
this.comparator = comparator;
this.scheduledExecutor.scheduleAtFixedRate(computeTask,
nodeComputationInterval, nodeComputationInterval,
TimeUnit.MILLISECONDS);
}
@Override
public void addNode(List<NMContainerStatus> containerStatuses, RMNode
rmNode) {
LOG.debug("Node added event from: " + rmNode.getNode().getName());
// Ignoring this currently : atleast one NODE_UPDATE heartbeat is
// required to ensure node eligibility.
}
@Override
public void removeNode(RMNode removedRMNode) {
LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
synchronized (this.clusterNodes) {
if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
this.clusterNodes.remove(removedRMNode.getNodeID());
LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
} else {
LOG.debug("Node not in list!");
}
}
}
@Override
public void nodeUpdate(RMNode rmNode) {
LOG.debug("Node update event from: " + rmNode.getNodeID());
QueuedContainersStatus queuedContainersStatus =
rmNode.getQueuedContainersStatus();
int estimatedQueueWaitTime =
queuedContainersStatus.getEstimatedQueueWaitTime();
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
// Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
// UNLESS comparator is based on queue length, in which case, we should add
synchronized (this.clusterNodes) {
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
if (currentNode == null) {
if (estimatedQueueWaitTime != -1
|| comparator == TopKComparator.QUEUE_LENGTH) {
this.clusterNodes.put(rmNode.getNodeID(),
new ClusterNode(rmNode.getNodeID())
.setQueueTime(estimatedQueueWaitTime)
.setWaitQueueLength(waitQueueLength));
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
} else {
LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
}
} else {
if (estimatedQueueWaitTime != -1
|| comparator == TopKComparator.QUEUE_LENGTH) {
currentNode
.setQueueTime(estimatedQueueWaitTime)
.setWaitQueueLength(waitQueueLength)
.updateTimestamp();
LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
} else {
this.clusterNodes.remove(rmNode.getNodeID());
LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + currentNode.queueTime + "] and " +
"wait queue length [" + currentNode.waitQueueLength + "]");
}
}
}
}
@Override
public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
LOG.debug("Node resource update event from: " + rmNode.getNodeID());
// Ignoring this currently...
}
public List<NodeId> selectNodes() {
synchronized (this.topKNodes) {
return this.k < this.topKNodes.size() ?
new ArrayList<>(this.topKNodes).subList(0, this.k) :
new ArrayList<>(this.topKNodes);
}
}
private List<NodeId> computeTopKNodes() {
synchronized (this.clusterNodes) {
ArrayList aList = new ArrayList<>(this.clusterNodes.values());
List<NodeId> retList = new ArrayList<>();
Object[] nodes = aList.toArray();
// Collections.sort would do something similar by calling Arrays.sort
// internally but would finally iterate through the input list (aList)
// to reset the value of each element.. Since we don't really care about
// 'aList', we can use the iteration to create the list of nodeIds which
// is what we ultimately care about.
Arrays.sort(nodes, (Comparator)comparator);
for (int j=0; j < nodes.length; j++) {
retList.add(((ClusterNode)nodes[j]).nodeId);
}
return retList;
}
}
}

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -260,6 +260,10 @@ public class MockNodes {
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
};
private static RMNode buildRMNode(int rack, final Resource perNode,

View File

@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -167,10 +169,11 @@ public class TestApplicationCleanup {
MockRM rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
super.handle(event);
}
};
}

View File

@ -72,6 +72,11 @@ public class TestDistributedSchedulingService {
public AMLivelinessMonitor getAMLivelinessMonitor() {
return null;
}
@Override
public Configuration getYarnConfiguration() {
return new YarnConfiguration();
}
};
DistributedSchedulingService service =
new DistributedSchedulingService(rmContext, null) {

View File

@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@ -44,8 +44,8 @@ public class TestRMDispatcher {
AsyncDispatcher rmDispatcher = new AsyncDispatcher();
CapacityScheduler sched = spy(new CapacityScheduler());
YarnConfiguration conf = new YarnConfiguration();
SchedulerEventDispatcher schedulerDispatcher =
new SchedulerEventDispatcher(sched);
EventDispatcher schedulerDispatcher =
new EventDispatcher(sched, sched.getClass().getName());
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
rmDispatcher.init(conf);
rmDispatcher.start();

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -985,7 +986,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@ -65,10 +66,11 @@ public class TestAMRMRPCNodeUpdates {
}
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
super.handle(event);
}
};
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -423,10 +424,11 @@ public class TestAMRestart {
MockRM rm1 = new MockRM(conf, memStore) {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
super.handle(event);
}
};
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.List;
public class TestTopKNodeSelector {
static class FakeNodeId extends NodeId {
final String host;
final int port;
public FakeNodeId(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public String getHost() {
return host;
}
@Override
public int getPort() {
return port;
}
@Override
protected void setHost(String host) {}
@Override
protected void setPort(int port) {}
@Override
protected void build() {}
@Override
public String toString() {
return host + ":" + port;
}
}
@Test
public void testQueueTimeSort() {
TopKNodeSelector selector = new TopKNodeSelector(5,
TopKNodeSelector.TopKComparator.WAIT_TIME);
selector.nodeUpdate(createRMNode("h1", 1, 15, 10));
selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
selector.nodeUpdate(createRMNode("h3", 3, 10, 10));
selector.computeTask.run();
List<NodeId> nodeIds = selector.selectNodes();
System.out.println("1-> " + nodeIds);
Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h3:3", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now update node3
selector.nodeUpdate(createRMNode("h3", 3, 2, 10));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("2-> "+ nodeIds);
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now send update with -1 wait time
selector.nodeUpdate(createRMNode("h4", 4, -1, 10));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("3-> "+ nodeIds);
// No change
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
}
@Test
public void testQueueLengthSort() {
TopKNodeSelector selector = new TopKNodeSelector(5,
TopKNodeSelector.TopKComparator.QUEUE_LENGTH);
selector.nodeUpdate(createRMNode("h1", 1, -1, 15));
selector.nodeUpdate(createRMNode("h2", 2, -1, 5));
selector.nodeUpdate(createRMNode("h3", 3, -1, 10));
selector.computeTask.run();
List<NodeId> nodeIds = selector.selectNodes();
System.out.println("1-> " + nodeIds);
Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h3:3", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now update node3
selector.nodeUpdate(createRMNode("h3", 3, -1, 2));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("2-> "+ nodeIds);
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now send update with -1 wait time but valid length
selector.nodeUpdate(createRMNode("h4", 4, -1, 20));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("3-> "+ nodeIds);
// No change
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
Assert.assertEquals("h4:4", nodeIds.get(3).toString());
}
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength) {
RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port);
Mockito.when(node1.getNodeID()).thenReturn(nID1);
QueuedContainersStatus status1 =
Mockito.mock(QueuedContainersStatus.class);
Mockito.when(status1.getEstimatedQueueWaitTime())
.thenReturn(waitTime);
Mockito.when(status1.getWaitQueueLength())
.thenReturn(queueLength);
Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
return node1;
}
}