YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers. (asuresh)

(cherry picked from commit 341888a0aa)
This commit is contained in:
Arun Suresh 2016-04-26 20:12:12 -07:00
parent 307cda70db
commit 1b4600abcf
28 changed files with 1037 additions and 132 deletions

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -190,6 +191,10 @@ public class NodeInfo {
return null; return null;
} }
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
@Override @Override
public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getAggregatedContainersUtilization() {
return null; return null;

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -179,6 +180,10 @@ public class RMNodeWrapper implements RMNode {
return Collections.EMPTY_LIST; return Collections.EMPTY_LIST;
} }
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
@Override @Override
public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getAggregatedContainersUtilization() {
return node.getAggregatedContainersUtilization(); return node.getAggregatedContainersUtilization();

View File

@ -338,6 +338,23 @@ public class YarnConfiguration extends Configuration {
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
600000; 600000;
/** K least loaded nodes to be provided to the LocalScheduler of a
* NodeManager for Distributed Scheduling */
public static final String DIST_SCHEDULING_TOP_K =
YARN_PREFIX + "distributed-scheduling.top-k";
public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
/** Frequency for computing Top K Best Nodes */
public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
/** Comparator for determining Node Load for Distributed Scheduling */
public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
YARN_PREFIX + "distributed-scheduling.top-k-comparator";
public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
"QUEUE_LENGTH";
/** /**
* Enable/disable intermediate-data encryption at YARN level. For now, this * Enable/disable intermediate-data encryption at YARN level. For now, this
* only is used by the FileSystemRMStateStore to setup right file-system * only is used by the FileSystemRMStateStore to setup right file-system

View File

@ -114,6 +114,31 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPrefixToSkipCompare configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED); .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
// Ignore Distributed Scheduling Related Configurations.
// Since it is still a "work in progress" feature
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_ENABLED);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS);
configurationPrefixToSkipCompare
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR);
// Set by container-executor.cfg // Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.event;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* This is a specialized EventHandler to be used by Services that are expected
* handle a large number of events efficiently by ensuring that the caller
* thread is not blocked. Events are immediately stored in a BlockingQueue and
* a separate dedicated Thread consumes events from the queue and handles
* appropriately
* @param <T> Type of Event
*/
public class EventDispatcher<T extends Event> extends
AbstractService implements EventHandler<T> {
private final EventHandler<T> handler;
private final BlockingQueue<T> eventQueue =
new LinkedBlockingDeque<>();
private final Thread eventProcessor;
private volatile boolean stopped = false;
private boolean shouldExitOnError = false;
private static final Log LOG = LogFactory.getLog(EventDispatcher.class);
private final class EventProcessor implements Runnable {
@Override
public void run() {
T event;
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return; // TODO: Kill RM.
}
try {
handler.handle(event);
} catch (Throwable t) {
// An error occurred, but we are shutting down anyway.
// If it was an InterruptedException, the very act of
// shutdown could have caused it and is probably harmless.
if (stopped) {
LOG.warn("Exception during shutdown: ", t);
break;
}
LOG.fatal("Error in handling event type " + event.getType()
+ " to the Event Dispatcher", t);
if (shouldExitOnError
&& !ShutdownHookManager.get().isShutdownInProgress()) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
}
}
public EventDispatcher(EventHandler<T> handler, String name) {
super(name);
this.handler = handler;
this.eventProcessor = new Thread(new EventProcessor());
this.eventProcessor.setName(getName() + ":Event Processor");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.shouldExitOnError =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
this.eventProcessor.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
this.stopped = true;
this.eventProcessor.interrupt();
try {
this.eventProcessor.join();
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
super.serviceStop();
}
@Override
public void handle(T event) {
try {
int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) {
LOG.info("Size of " + getName() + " event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity on " + getName() + "" +
"event queue: " + remCapacity);
}
this.eventQueue.put(event);
} catch (InterruptedException e) {
LOG.info("Interrupted. Trying to exit gracefully.");
}
}
}

View File

@ -122,4 +122,13 @@ public abstract class NodeStatus {
@Unstable @Unstable
public abstract void setIncreasedContainers( public abstract void setIncreasedContainers(
List<Container> increasedContainers); List<Container> increasedContainers);
@Private
@Unstable
public abstract QueuedContainersStatus getQueuedContainersStatus();
@Private
@Unstable
public abstract void setQueuedContainersStatus(
QueuedContainersStatus queuedContainersStatus);
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>
* <code>QueuedContainersStatus</code> captures information pertaining to the
* state of execution of the Queueable containers within a node.
* </p>
*/
@Private
@Evolving
public abstract class QueuedContainersStatus {
public static QueuedContainersStatus newInstance() {
return Records.newRecord(QueuedContainersStatus.class);
}
public abstract int getEstimatedQueueWaitTime();
public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
public abstract int getWaitQueueLength();
public abstract void setWaitQueueLength(int queueWaitTime);
}

View File

@ -33,14 +33,17 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -400,6 +403,27 @@ public class NodeStatusPBImpl extends NodeStatus {
this.increasedContainers = increasedContainers; this.increasedContainers = increasedContainers;
} }
@Override
public QueuedContainersStatus getQueuedContainersStatus() {
NodeStatusProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
if (!p.hasQueuedContainerStatus()) {
return null;
}
return convertFromProtoFormat(p.getQueuedContainerStatus());
}
@Override
public void setQueuedContainersStatus(QueuedContainersStatus queuedContainersStatus) {
maybeInitBuilder();
if (queuedContainersStatus == null) {
this.builder.clearQueuedContainerStatus();
return;
}
this.builder.setQueuedContainerStatus(
convertToProtoFormat(queuedContainersStatus));
}
private NodeIdProto convertToProtoFormat(NodeId nodeId) { private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto(); return ((NodeIdPBImpl)nodeId).getProto();
} }
@ -433,15 +457,25 @@ public class NodeStatusPBImpl extends NodeStatus {
return ((ApplicationIdPBImpl)c).getProto(); return ((ApplicationIdPBImpl)c).getProto();
} }
private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) { private YarnProtos.ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
return ((ResourceUtilizationPBImpl) r).getProto(); return ((ResourceUtilizationPBImpl) r).getProto();
} }
private ResourceUtilizationPBImpl convertFromProtoFormat( private ResourceUtilizationPBImpl convertFromProtoFormat(
ResourceUtilizationProto p) { YarnProtos.ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p); return new ResourceUtilizationPBImpl(p);
} }
private YarnServerCommonProtos.QueuedContainersStatusProto convertToProtoFormat(
QueuedContainersStatus r) {
return ((QueuedContainersStatusPBImpl) r).getProto();
}
private QueuedContainersStatus convertFromProtoFormat(
YarnServerCommonProtos.QueuedContainersStatusProto p) {
return new QueuedContainersStatusPBImpl(p);
}
private ContainerPBImpl convertFromProtoFormat( private ContainerPBImpl convertFromProtoFormat(
ContainerProto c) { ContainerProto c) {
return new ContainerPBImpl(c); return new ContainerPBImpl(c);

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records.impl.pb;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
public class QueuedContainersStatusPBImpl extends QueuedContainersStatus {
private YarnServerCommonProtos.QueuedContainersStatusProto proto =
YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance();
private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder = null;
private boolean viaProto = false;
public QueuedContainersStatusPBImpl() {
builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder();
}
public QueuedContainersStatusPBImpl(YarnServerCommonProtos
.QueuedContainersStatusProto proto) {
this.proto = proto;
viaProto = true;
}
public YarnServerCommonProtos.QueuedContainersStatusProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder =
YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getEstimatedQueueWaitTime() {
YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getEstimatedQueueWaitTime();
}
@Override
public void setEstimatedQueueWaitTime(int queueWaitTime) {
maybeInitBuilder();
builder.setEstimatedQueueWaitTime(queueWaitTime);
}
@Override
public int getWaitQueueLength() {
YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getWaitQueueLength();
}
@Override
public void setWaitQueueLength(int waitQueueLength) {
maybeInitBuilder();
builder.setWaitQueueLength(waitQueueLength);
}
}

View File

@ -39,6 +39,12 @@ message NodeStatusProto {
optional ResourceUtilizationProto containers_utilization = 6; optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7; optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8; repeated ContainerProto increased_containers = 8;
optional QueuedContainersStatusProto queued_container_status = 9;
}
message QueuedContainersStatusProto {
optional int32 estimated_queue_wait_time = 1;
optional int32 wait_queue_length = 2;
} }
message MasterKeyProto { message MasterKeyProto {

View File

@ -39,8 +39,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
.NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -131,4 +137,28 @@ public class TestProtocolRecords {
((NodeHeartbeatResponsePBImpl) record).getProto()); ((NodeHeartbeatResponsePBImpl) record).getProto());
Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps()); Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
} }
@Test
public void testNodeHeartBeatRequest() throws IOException {
NodeHeartbeatRequest record =
Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus nodeStatus =
Records.newRecord(NodeStatus.class);
QueuedContainersStatus queuedContainersStatus = Records.newRecord
(QueuedContainersStatus.class);
queuedContainersStatus.setEstimatedQueueWaitTime(123);
queuedContainersStatus.setWaitQueueLength(321);
nodeStatus.setQueuedContainersStatus(queuedContainersStatus);
record.setNodeStatus(nodeStatus);
NodeHeartbeatRequestPBImpl pb = new
NodeHeartbeatRequestPBImpl(
((NodeHeartbeatRequestPBImpl) record).getProto());
Assert.assertEquals(123,
pb.getNodeStatus()
.getQueuedContainersStatus().getEstimatedQueueWaitTime());
Assert.assertEquals(321,
pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
}
} }

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@ -459,9 +460,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
createKeepAliveApplicationList(), nodeHealthStatus, createKeepAliveApplicationList(), nodeHealthStatus,
containersUtilization, nodeUtilization, increasedContainers); containersUtilization, nodeUtilization, increasedContainers);
nodeStatus.setQueuedContainersStatus(getQueuedContainerStatus());
return nodeStatus; return nodeStatus;
} }
private QueuedContainersStatus getQueuedContainerStatus() {
QueuedContainersStatus status = QueuedContainersStatus.newInstance();
status.setWaitQueueLength(
this.context.getQueuingContext().getQueuedContainers().size());
return status;
}
/** /**
* Get the aggregated utilization of the containers in this node. * Get the aggregated utilization of the containers in this node.
* @return Resource utilization of all the containers. * @return Resource utilization of all the containers.

View File

@ -469,7 +469,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
+ "will be added to the queued containers."); + "will be added to the queued containers.");
AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo( AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
token, rcs.getStartRequest(), token.getExecutionType(), token, null, rcs.getStartRequest(), token.getExecutionType(),
token.getResource(), getConfig()); token.getResource(), getConfig());
this.context.getQueuingContext().getQueuedContainers().put( this.context.getQueuingContext().getQueuedContainers().put(

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import java.util.List;
public interface ClusterMonitor {
void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
void removeNode(RMNode removedRMNode);
void nodeUpdate(RMNode rmNode);
void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
}

View File

@ -18,10 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
@ -40,20 +44,62 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
.AMRMTokenSecretManager; .TopKNodeSelector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* The DistributedSchedulingService is started instead of the
* ApplicationMasterService if DistributedScheduling is enabled for the YARN
* cluster.
* It extends the functionality of the ApplicationMasterService by servicing
* clients (AMs and AMRMProxy request interceptors) that understand the
* DistributedSchedulingProtocol.
*/
public class DistributedSchedulingService extends ApplicationMasterService public class DistributedSchedulingService extends ApplicationMasterService
implements DistributedSchedulerProtocol { implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
private static final Log LOG =
LogFactory.getLog(DistributedSchedulingService.class);
private final TopKNodeSelector clusterMonitor;
private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
new ConcurrentHashMap<>();
public DistributedSchedulingService(RMContext rmContext, public DistributedSchedulingService(RMContext rmContext,
YarnScheduler scheduler) { YarnScheduler scheduler) {
super(DistributedSchedulingService.class.getName(), rmContext, scheduler); super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
int k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.DIST_SCHEDULING_TOP_K,
YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
TopKNodeSelector.TopKComparator comparator =
TopKNodeSelector.TopKComparator.valueOf(
rmContext.getYarnConfiguration().get(
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
TopKNodeSelector topKSelector =
new TopKNodeSelector(k, topKComputationInterval, comparator);
this.clusterMonitor = topKSelector;
} }
@Override @Override
@ -63,8 +109,9 @@ public class DistributedSchedulingService extends ApplicationMasterService
addr, serverConf, secretManager, addr, serverConf, secretManager,
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
// To support application running no NMs that DO NOT support // To support application running on NMs that DO NOT support
// Dist Scheduling... // Dist Scheduling... The server multiplexes both the
// ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
ApplicationMasterProtocolPB.class, ApplicationMasterProtocolPB.class,
ApplicationMasterProtocolService.newReflectiveBlockingService( ApplicationMasterProtocolService.newReflectiveBlockingService(
@ -141,10 +188,8 @@ public class DistributedSchedulingService extends ApplicationMasterService
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
// Set nodes to be used for scheduling // Set nodes to be used for scheduling
// TODO: The actual computation of the list will happen in YARN-4412
// TODO: Till then, send the complete list
dsResp.setNodesForScheduling( dsResp.setNodesForScheduling(
new ArrayList<>(this.rmContext.getRMNodes().keySet())); new ArrayList<>(this.clusterMonitor.selectNodes()));
return dsResp; return dsResp;
} }
@ -156,7 +201,95 @@ public class DistributedSchedulingService extends ApplicationMasterService
(DistSchedAllocateResponse.class); (DistSchedAllocateResponse.class);
dsResp.setAllocateResponse(response); dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling( dsResp.setNodesForScheduling(
new ArrayList<>(this.rmContext.getRMNodes().keySet())); new ArrayList<>(this.clusterMonitor.selectNodes()));
return dsResp; return dsResp;
} }
private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
String rackName, NodeId nodeId) {
if (rackName != null) {
mapping.putIfAbsent(rackName, new HashSet<NodeId>());
Set<NodeId> nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
nodeIds.add(nodeId);
}
}
}
private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
String rackName, NodeId nodeId) {
if (rackName != null) {
Set<NodeId> nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
nodeIds.remove(nodeId);
}
}
}
@Override
public void handle(SchedulerEvent event) {
switch (event.getType()) {
case NODE_ADDED:
if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode());
addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
nodeAddedEvent.getAddedRMNode().getNodeID());
addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
nodeAddedEvent.getAddedRMNode().getNodeID());
break;
case NODE_REMOVED:
if (!(event instanceof NodeRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeRemovedSchedulerEvent nodeRemovedEvent =
(NodeRemovedSchedulerEvent)event;
clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
removeFromMapping(rackToNode,
nodeRemovedEvent.getRemovedRMNode().getRackName(),
nodeRemovedEvent.getRemovedRMNode().getNodeID());
removeFromMapping(hostToNode,
nodeRemovedEvent.getRemovedRMNode().getHostName(),
nodeRemovedEvent.getRemovedRMNode().getNodeID());
break;
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
case NODE_RESOURCE_UPDATE:
if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent)event;
clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
break;
// <-- IGNORED EVENTS : START -->
case APP_ADDED:
break;
case APP_REMOVED:
break;
case APP_ATTEMPT_ADDED:
break;
case APP_ATTEMPT_REMOVED:
break;
case CONTAINER_EXPIRED:
break;
case NODE_LABELS_UPDATE:
break;
// <-- IGNORED EVENTS : END -->
default:
LOG.error("Unknown event arrived at DistributedSchedulingService: "
+ event.toString());
}
}
} }

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@ -119,8 +119,6 @@ import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/** /**
* The ResourceManager is the main class that is a set of components. * The ResourceManager is the main class that is a set of components.
@ -371,7 +369,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler); return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
} }
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {
@ -733,108 +731,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
} }
@Private
public static class SchedulerEventDispatcher extends AbstractService
implements EventHandler<SchedulerEvent> {
private final ResourceScheduler scheduler;
private final BlockingQueue<SchedulerEvent> eventQueue =
new LinkedBlockingQueue<SchedulerEvent>();
private volatile int lastEventQueueSizeLogged = 0;
private final Thread eventProcessor;
private volatile boolean stopped = false;
private boolean shouldExitOnError = false;
public SchedulerEventDispatcher(ResourceScheduler scheduler) {
super(SchedulerEventDispatcher.class.getName());
this.scheduler = scheduler;
this.eventProcessor = new Thread(new EventProcessor());
this.eventProcessor.setName("ResourceManager Event Processor");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.shouldExitOnError =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
this.eventProcessor.start();
super.serviceStart();
}
private final class EventProcessor implements Runnable {
@Override
public void run() {
SchedulerEvent event;
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return; // TODO: Kill RM.
}
try {
scheduler.handle(event);
} catch (Throwable t) {
// An error occurred, but we are shutting down anyway.
// If it was an InterruptedException, the very act of
// shutdown could have caused it and is probably harmless.
if (stopped) {
LOG.warn("Exception during shutdown: ", t);
break;
}
LOG.fatal("Error in handling event type " + event.getType()
+ " to the scheduler", t);
if (shouldExitOnError
&& !ShutdownHookManager.get().isShutdownInProgress()) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
}
}
@Override
protected void serviceStop() throws Exception {
this.stopped = true;
this.eventProcessor.interrupt();
try {
this.eventProcessor.join();
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
super.serviceStop();
}
@Override
public void handle(SchedulerEvent event) {
try {
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of scheduler event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.info("Very low remaining capacity on scheduler event queue: "
+ remCapacity);
}
this.eventQueue.put(event);
} catch (InterruptedException e) {
LOG.info("Interrupted. Trying to exit gracefully.");
}
}
}
@Private @Private
public static class RMFatalEventDispatcher public static class RMFatalEventDispatcher
implements EventHandler<RMFatalEvent> { implements EventHandler<RMFatalEvent> {
@ -1246,7 +1142,19 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (this.rmContext.getYarnConfiguration().getBoolean( if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED, YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
return new DistributedSchedulingService(this.rmContext, scheduler); DistributedSchedulingService distributedSchedulingService = new
DistributedSchedulingService(this.rmContext, scheduler);
EventDispatcher distSchedulerEventDispatcher =
new EventDispatcher(distributedSchedulingService,
DistributedSchedulingService.class.getName());
// Add an event dispoatcher for the DistributedSchedulingService
// to handle node updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses,
// we register interest for it..
addService(distSchedulerEventDispatcher);
rmDispatcher.register(SchedulerEventType.class,
distSchedulerEventDispatcher);
return distributedSchedulingService;
} }
return new ApplicationMasterService(this.rmContext, scheduler); return new ApplicationMasterService(this.rmContext, scheduler);
} }

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
/** /**
* Node managers information on available resources * Node managers information on available resources
@ -172,4 +173,7 @@ public interface RMNode {
long getUntrackedTimeStamp(); long getUntrackedTimeStamp();
void setUntrackedTimeStamp(long timeStamp); void setUntrackedTimeStamp(long timeStamp);
public QueuedContainersStatus getQueuedContainersStatus();
} }

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
@ -127,6 +128,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Resource utilization for the node. */ /* Resource utilization for the node. */
private ResourceUtilization nodeUtilization; private ResourceUtilization nodeUtilization;
/* Container Queue Information for the node.. Used by Distributed Scheduler */
private QueuedContainersStatus queuedContainersStatus;
private final ContainerAllocationExpirer containerAllocationExpirer; private final ContainerAllocationExpirer containerAllocationExpirer;
/* set of containers that have just launched */ /* set of containers that have just launched */
private final Set<ContainerId> launchedContainers = private final Set<ContainerId> launchedContainers =
@ -1130,7 +1134,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo());
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents( NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
rmNode, statusEvent); rmNode, statusEvent);
NodeState initialState = rmNode.getState(); NodeState initialState = rmNode.getState();
@ -1402,4 +1406,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public void setUntrackedTimeStamp(long ts) { public void setUntrackedTimeStamp(long ts) {
this.timeStamp = ts; this.timeStamp = ts;
} }
@Override
public QueuedContainersStatus getQueuedContainersStatus() {
this.readLock.lock();
try {
return this.queuedContainersStatus;
} finally {
this.readLock.unlock();
}
}
public void setQueuedContainersStatus(QueuedContainersStatus
queuedContainersStatus) {
this.writeLock.lock();
try {
this.queuedContainersStatus = queuedContainersStatus;
} finally {
this.writeLock.unlock();
}
}
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -79,6 +80,10 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.logAggregationReportsForApps; return this.logAggregationReportsForApps;
} }
public QueuedContainersStatus getContainerQueueInfo() {
return this.nodeStatus.getQueuedContainersStatus();
}
public void setLogAggregationReportsForApps( public void setLogAggregationReportsForApps(
List<LogAggregationReport> logAggregationReportsForApps) { List<LogAggregationReport> logAggregationReportsForApps) {
this.logAggregationReportsForApps = logAggregationReportsForApps; this.logAggregationReportsForApps = logAggregationReportsForApps;
@ -89,4 +94,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.nodeStatus.getIncreasedContainers() == null ? return this.nodeStatus.getIncreasedContainers() == null ?
Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers(); Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers();
} }
} }

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TopKNodeSelector implements ClusterMonitor {
final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
public enum TopKComparator implements Comparator<ClusterNode> {
WAIT_TIME,
QUEUE_LENGTH;
@Override
public int compare(ClusterNode o1, ClusterNode o2) {
if (getQuant(o1) == getQuant(o2)) {
return o1.timestamp < o2.timestamp ? +1 : -1;
}
return getQuant(o1) > getQuant(o2) ? +1 : -1;
}
private int getQuant(ClusterNode c) {
return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
}
}
static class ClusterNode {
int queueTime = -1;
int waitQueueLength = 0;
double timestamp;
final NodeId nodeId;
public ClusterNode(NodeId nodeId) {
this.nodeId = nodeId;
updateTimestamp();
}
public ClusterNode setQueueTime(int queueTime) {
this.queueTime = queueTime;
return this;
}
public ClusterNode setWaitQueueLength(int queueLength) {
this.waitQueueLength = queueLength;
return this;
}
public ClusterNode updateTimestamp() {
this.timestamp = System.currentTimeMillis();
return this;
}
}
private final int k;
private final List<NodeId> topKNodes;
private final ScheduledExecutorService scheduledExecutor;
private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
private final Comparator<ClusterNode> comparator;
Runnable computeTask = new Runnable() {
@Override
public void run() {
synchronized (topKNodes) {
topKNodes.clear();
topKNodes.addAll(computeTopKNodes());
}
}
};
@VisibleForTesting
TopKNodeSelector(int k, TopKComparator comparator) {
this.k = k;
this.topKNodes = new ArrayList<>();
this.comparator = comparator;
this.scheduledExecutor = null;
}
public TopKNodeSelector(int k, long nodeComputationInterval,
TopKComparator comparator) {
this.k = k;
this.topKNodes = new ArrayList<>();
this.scheduledExecutor = Executors.newScheduledThreadPool(1);
this.comparator = comparator;
this.scheduledExecutor.scheduleAtFixedRate(computeTask,
nodeComputationInterval, nodeComputationInterval,
TimeUnit.MILLISECONDS);
}
@Override
public void addNode(List<NMContainerStatus> containerStatuses, RMNode
rmNode) {
LOG.debug("Node added event from: " + rmNode.getNode().getName());
// Ignoring this currently : atleast one NODE_UPDATE heartbeat is
// required to ensure node eligibility.
}
@Override
public void removeNode(RMNode removedRMNode) {
LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
synchronized (this.clusterNodes) {
if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
this.clusterNodes.remove(removedRMNode.getNodeID());
LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
} else {
LOG.debug("Node not in list!");
}
}
}
@Override
public void nodeUpdate(RMNode rmNode) {
LOG.debug("Node update event from: " + rmNode.getNodeID());
QueuedContainersStatus queuedContainersStatus =
rmNode.getQueuedContainersStatus();
int estimatedQueueWaitTime =
queuedContainersStatus.getEstimatedQueueWaitTime();
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
// Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
// UNLESS comparator is based on queue length, in which case, we should add
synchronized (this.clusterNodes) {
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
if (currentNode == null) {
if (estimatedQueueWaitTime != -1
|| comparator == TopKComparator.QUEUE_LENGTH) {
this.clusterNodes.put(rmNode.getNodeID(),
new ClusterNode(rmNode.getNodeID())
.setQueueTime(estimatedQueueWaitTime)
.setWaitQueueLength(waitQueueLength));
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
} else {
LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
}
} else {
if (estimatedQueueWaitTime != -1
|| comparator == TopKComparator.QUEUE_LENGTH) {
currentNode
.setQueueTime(estimatedQueueWaitTime)
.setWaitQueueLength(waitQueueLength)
.updateTimestamp();
LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
} else {
this.clusterNodes.remove(rmNode.getNodeID());
LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + currentNode.queueTime + "] and " +
"wait queue length [" + currentNode.waitQueueLength + "]");
}
}
}
}
@Override
public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
LOG.debug("Node resource update event from: " + rmNode.getNodeID());
// Ignoring this currently...
}
public List<NodeId> selectNodes() {
synchronized (this.topKNodes) {
return this.k < this.topKNodes.size() ?
new ArrayList<>(this.topKNodes).subList(0, this.k) :
new ArrayList<>(this.topKNodes);
}
}
private List<NodeId> computeTopKNodes() {
synchronized (this.clusterNodes) {
ArrayList aList = new ArrayList<>(this.clusterNodes.values());
List<NodeId> retList = new ArrayList<>();
Object[] nodes = aList.toArray();
// Collections.sort would do something similar by calling Arrays.sort
// internally but would finally iterate through the input list (aList)
// to reset the value of each element.. Since we don't really care about
// 'aList', we can use the iteration to create the list of nodeIds which
// is what we ultimately care about.
Arrays.sort(nodes, (Comparator)comparator);
for (int j=0; j < nodes.length; j++) {
retList.add(((ClusterNode)nodes[j]).nodeId);
}
return retList;
}
}
}

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -268,6 +268,12 @@ public class MockNodes {
@Override @Override
public void setUntrackedTimeStamp(long timeStamp) { public void setUntrackedTimeStamp(long timeStamp) {
}
@Override
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
} }
}; };

View File

@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -167,10 +169,11 @@ public class TestApplicationCleanup {
MockRM rm = new MockRM() { MockRM rm = new MockRM() {
@Override @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) { return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override @Override
public void handle(SchedulerEvent event) { public void handle(SchedulerEvent event) {
scheduler.handle(event); super.handle(event);
} }
}; };
} }

View File

@ -72,6 +72,11 @@ public class TestDistributedSchedulingService {
public AMLivelinessMonitor getAMLivelinessMonitor() { public AMLivelinessMonitor getAMLivelinessMonitor() {
return null; return null;
} }
@Override
public Configuration getYarnConfiguration() {
return new YarnConfiguration();
}
}; };
DistributedSchedulingService service = DistributedSchedulingService service =
new DistributedSchedulingService(rmContext, null) { new DistributedSchedulingService(rmContext, null) {

View File

@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@ -44,8 +44,8 @@ public class TestRMDispatcher {
AsyncDispatcher rmDispatcher = new AsyncDispatcher(); AsyncDispatcher rmDispatcher = new AsyncDispatcher();
CapacityScheduler sched = spy(new CapacityScheduler()); CapacityScheduler sched = spy(new CapacityScheduler());
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
SchedulerEventDispatcher schedulerDispatcher = EventDispatcher schedulerDispatcher =
new SchedulerEventDispatcher(sched); new EventDispatcher(sched, sched.getClass().getName());
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
rmDispatcher.init(conf); rmDispatcher.init(conf);
rmDispatcher.start(); rmDispatcher.start();

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -987,7 +988,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm = new MockRM() { rm = new MockRM() {
@Override @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) { return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override @Override
public void handle(SchedulerEvent event) { public void handle(SchedulerEvent event) {
scheduler.handle(event); scheduler.handle(event);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -65,10 +66,11 @@ public class TestAMRMRPCNodeUpdates {
} }
@Override @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) { return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override @Override
public void handle(SchedulerEvent event) { public void handle(SchedulerEvent event) {
scheduler.handle(event); super.handle(event);
} }
}; };
} }

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -422,10 +423,11 @@ public class TestAMRestart {
MockRM rm1 = new MockRM(conf, memStore) { MockRM rm1 = new MockRM(conf, memStore) {
@Override @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) { return new EventDispatcher<SchedulerEvent>(this.scheduler,
this.scheduler.getClass().getName()) {
@Override @Override
public void handle(SchedulerEvent event) { public void handle(SchedulerEvent event) {
scheduler.handle(event); super.handle(event);
} }
}; };
} }

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.List;
public class TestTopKNodeSelector {
static class FakeNodeId extends NodeId {
final String host;
final int port;
public FakeNodeId(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public String getHost() {
return host;
}
@Override
public int getPort() {
return port;
}
@Override
protected void setHost(String host) {}
@Override
protected void setPort(int port) {}
@Override
protected void build() {}
@Override
public String toString() {
return host + ":" + port;
}
}
@Test
public void testQueueTimeSort() {
TopKNodeSelector selector = new TopKNodeSelector(5,
TopKNodeSelector.TopKComparator.WAIT_TIME);
selector.nodeUpdate(createRMNode("h1", 1, 15, 10));
selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
selector.nodeUpdate(createRMNode("h3", 3, 10, 10));
selector.computeTask.run();
List<NodeId> nodeIds = selector.selectNodes();
System.out.println("1-> " + nodeIds);
Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h3:3", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now update node3
selector.nodeUpdate(createRMNode("h3", 3, 2, 10));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("2-> "+ nodeIds);
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now send update with -1 wait time
selector.nodeUpdate(createRMNode("h4", 4, -1, 10));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("3-> "+ nodeIds);
// No change
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
}
@Test
public void testQueueLengthSort() {
TopKNodeSelector selector = new TopKNodeSelector(5,
TopKNodeSelector.TopKComparator.QUEUE_LENGTH);
selector.nodeUpdate(createRMNode("h1", 1, -1, 15));
selector.nodeUpdate(createRMNode("h2", 2, -1, 5));
selector.nodeUpdate(createRMNode("h3", 3, -1, 10));
selector.computeTask.run();
List<NodeId> nodeIds = selector.selectNodes();
System.out.println("1-> " + nodeIds);
Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h3:3", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now update node3
selector.nodeUpdate(createRMNode("h3", 3, -1, 2));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("2-> "+ nodeIds);
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
// Now send update with -1 wait time but valid length
selector.nodeUpdate(createRMNode("h4", 4, -1, 20));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("3-> "+ nodeIds);
// No change
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
Assert.assertEquals("h4:4", nodeIds.get(3).toString());
}
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength) {
RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port);
Mockito.when(node1.getNodeID()).thenReturn(nID1);
QueuedContainersStatus status1 =
Mockito.mock(QueuedContainersStatus.class);
Mockito.when(status1.getEstimatedQueueWaitTime())
.thenReturn(waitTime);
Mockito.when(status1.getWaitQueueLength())
.thenReturn(queueLength);
Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
return node1;
}
}