merge YARN-365 from trunk. Change NM heartbeat handling to not generate a scheduler event on each heartbeat. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1450008 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-02-26 03:33:01 +00:00
parent 89f14ca928
commit c215e12419
12 changed files with 287 additions and 106 deletions

View File

@ -8,6 +8,9 @@ Release 2.0.4-beta - UNRELEASED
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
on each heartbeat. (Xuan Gong via sseth)
OPTIMIZATIONS
BUG FIXES

View File

@ -106,4 +106,13 @@ public interface RMNode {
public List<ApplicationId> getAppsToCleanup();
public HeartbeatResponse getLastHeartBeatResponse();
/**
* Get and clear the list of containerUpdates accumulated across NM
* heartbeats.
*
* @return containerUpdates accumulated across NM heartbeats.
*/
public List<UpdatedContainerInfo> pullContainerUpdates();
}

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BuilderUtils.ContainerIdComparator;
import com.google.common.annotations.VisibleForTesting;
/**
* This class is used to keep track of all the applications/containers
* running on a node.
@ -78,6 +81,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final ReadLock readLock;
private final WriteLock writeLock;
private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
private volatile boolean nextHeartBeat = true;
private final NodeId nodeId;
private final RMContext context;
private final String hostName;
@ -186,6 +192,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.stateMachine = stateMachineFactory.make(this);
this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
}
@Override
@ -400,6 +407,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Kill containers since node is rejoining.
rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
@ -458,6 +466,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
rmNode.nodeUpdateQueue.clear();
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
rmNode.context.getDispatcher().getEventHandler().handle(
@ -489,6 +498,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
statusEvent.getNodeHealthStatus();
rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
rmNode.nodeUpdateQueue.clear();
// Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode));
@ -538,10 +548,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
completedContainers.add(remoteContainer);
}
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
completedContainers));
if(newlyLaunchedContainers.size() != 0
|| completedContainers.size() != 0) {
rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
(newlyLaunchedContainers, completedContainers));
}
if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode));
}
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
statusEvent.getKeepAliveAppIds());
@ -584,4 +600,25 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
return NodeState.UNHEALTHY;
}
}
@Override
public List<UpdatedContainerInfo> pullContainerUpdates() {
List<UpdatedContainerInfo> latestContainerInfoList =
new ArrayList<UpdatedContainerInfo>();
while(nodeUpdateQueue.peek() != null){
latestContainerInfoList.add(nodeUpdateQueue.poll());
}
this.nextHeartBeat = true;
return latestContainerInfoList;
}
@VisibleForTesting
public void setNextHeartBeat(boolean nextHeartBeat) {
this.nextHeartBeat = nextHeartBeat;
}
@VisibleForTesting
public int getQueueSize() {
return nodeUpdateQueue.size();
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
public class UpdatedContainerInfo {
private List<ContainerStatus> newlyLaunchedContainers;
private List<ContainerStatus> completedContainers;
public UpdatedContainerInfo() {
}
public UpdatedContainerInfo(List<ContainerStatus> newlyLaunchedContainers
, List<ContainerStatus> completedContainers) {
this.newlyLaunchedContainers = newlyLaunchedContainers;
this.completedContainers = completedContainers;
}
public List<ContainerStatus> getNewlyLaunchedContainers() {
return this.newlyLaunchedContainers;
}
public List<ContainerStatus> getCompletedContainers() {
return this.completedContainers;
}
}

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -562,15 +563,20 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
return root.getQueueUserAclInfo(user);
}
private synchronized void nodeUpdate(RMNode nm,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
FiCaSchedulerNode node = getNode(nm.getNodeID());
FiCaSchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@ -666,9 +672,7 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
case APP_ADDED:

View File

@ -18,35 +18,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodeUpdateSchedulerEvent extends SchedulerEvent {
private final RMNode rmNode;
private final List<ContainerStatus> newlyLaunchedContainers;
private final List<ContainerStatus> completedContainersStatuses;
public NodeUpdateSchedulerEvent(RMNode rmNode,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
public NodeUpdateSchedulerEvent(RMNode rmNode) {
super(SchedulerEventType.NODE_UPDATE);
this.rmNode = rmNode;
this.newlyLaunchedContainers = newlyLaunchedContainers;
this.completedContainersStatuses = completedContainers;
}
public RMNode getRMNode() {
return rmNode;
}
public List<ContainerStatus> getNewlyLaunchedContainers() {
return newlyLaunchedContainers;
}
public List<ContainerStatus> getCompletedContainers() {
return completedContainersStatuses;
}
}

View File

@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -746,15 +746,20 @@ public class FairScheduler implements ResourceScheduler {
/**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@ -860,9 +865,7 @@ public class FairScheduler implements ResourceScheduler {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
case APP_ADDED:
if (!(event instanceof AppAddedSchedulerEvent)) {

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@ -576,11 +577,16 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
return assignedContainers;
}
private synchronized void nodeUpdate(RMNode rmNode,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
@ -628,9 +634,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
{
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
case APP_ADDED:

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import com.google.common.collect.Lists;
@ -187,6 +190,11 @@ public class MockNodes {
public HeartbeatResponse getLastHeartBeatResponse() {
return null;
}
@Override
public List<UpdatedContainerInfo> pullContainerUpdates() {
return new ArrayList<UpdatedContainerInfo>();
}
};
private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {

View File

@ -201,7 +201,7 @@ public class TestFifoScheduler {
testMinimumAllocation(conf, allocMB / 2);
}
@Test
@Test (timeout = 5000)
public void testReconnectedNode() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setQueues("default", new String[] {"default"});
@ -215,19 +215,19 @@ public class TestFifoScheduler {
fs.handle(new NodeAddedSchedulerEvent(n1));
fs.handle(new NodeAddedSchedulerEvent(n2));
List<ContainerStatus> emptyList = new ArrayList<ContainerStatus>();
fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB());
// reconnect n1 with downgraded memory
n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
fs.handle(new NodeRemovedSchedulerEvent(n1));
fs.handle(new NodeAddedSchedulerEvent(n1));
fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList));
fs.handle(new NodeUpdateSchedulerEvent(n1));
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
}
@Test
@Test (timeout = 5000)
public void testHeadroom() throws Exception {
Configuration conf = new Configuration();
@ -275,7 +275,7 @@ public class TestFifoScheduler {
fs.allocate(appAttemptId2, ask2, emptyId);
// Trigger container assignment
fs.handle(new NodeUpdateSchedulerEvent(n1, emptyStatus, emptyStatus));
fs.handle(new NodeUpdateSchedulerEvent(n1));
// Get the allocation for the applications and verify headroom
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId);

View File

@ -22,6 +22,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.Collections;
@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -63,7 +65,7 @@ public class TestRMNodeTransitions {
private YarnScheduler scheduler;
private SchedulerEventType eventType;
private List<ContainerStatus> completedContainers;
private List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
private final class TestSchedulerEventDispatcher implements
EventHandler<SchedulerEvent> {
@ -89,10 +91,11 @@ public class TestRMNodeTransitions {
final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
eventType = event.getType();
if (eventType == SchedulerEventType.NODE_UPDATE) {
completedContainers =
((NodeUpdateSchedulerEvent)event).getCompletedContainers();
} else {
completedContainers = null;
List<UpdatedContainerInfo> lastestContainersInfoList =
((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates();
for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) {
completedContainers.addAll(lastestContainersInfo.getCompletedContainers());
}
}
return null;
}
@ -125,16 +128,16 @@ public class TestRMNodeTransitions {
return event;
}
@Test
@Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container
ContainerId completedContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(0, 0), 0), 0);
ContainerId completedContainerId = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(0, 0), 0), 0);
node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
Assert.assertEquals(1, node.getContainersToCleanUp().size());
@ -146,9 +149,110 @@ public class TestRMNodeTransitions {
doReturn(Collections.singletonList(containerStatus)).
when(statusEvent).getContainers();
node.handle(statusEvent);
Assert.assertEquals(0, completedContainers.size());
/* Expect the scheduler call handle function 2 times
* 1. RMNode status from new to Running, handle the add_node event
* 2. handle the node update event
*/
verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class));
}
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
//Start the node
node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null);
node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(0, 0), 0), 0);
ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(1, 1), 1), 1);
ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(1, 1), 1), 2);
RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1)
.getContainerId();
doReturn(Collections.singletonList(containerStatusFromNode1))
.when(statusEventFromNode1).getContainers();
node.handle(statusEventFromNode1);
Assert.assertEquals(1, completedContainers.size());
Assert.assertEquals(completedContainerIdFromNode1,
completedContainers.get(0).getContainerId());
completedContainers.clear();
doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1)
.getContainerId();
doReturn(Collections.singletonList(containerStatusFromNode2_1))
.when(statusEventFromNode2_1).getContainers();
doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2)
.getContainerId();
doReturn(Collections.singletonList(containerStatusFromNode2_2))
.when(statusEventFromNode2_2).getContainers();
node2.setNextHeartBeat(false);
node2.handle(statusEventFromNode2_1);
node2.setNextHeartBeat(true);
node2.handle(statusEventFromNode2_2);
Assert.assertEquals(2, completedContainers.size());
Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0)
.getContainerId());
Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1)
.getContainerId());
}
@Test (timeout = 5000)
public void testStatusChange(){
//Start the node
node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
//Add info to the queue first
node.setNextHeartBeat(false);
ContainerId completedContainerId1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(0, 0), 0), 0);
ContainerId completedContainerId2 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(1, 1), 1), 1);
RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
ContainerStatus containerStatus2 = mock(ContainerStatus.class);
doReturn(completedContainerId1).when(containerStatus1).getContainerId();
doReturn(Collections.singletonList(containerStatus1))
.when(statusEvent1).getContainers();
doReturn(completedContainerId2).when(containerStatus2).getContainerId();
doReturn(Collections.singletonList(containerStatus2))
.when(statusEvent2).getContainers();
verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class));
node.handle(statusEvent1);
node.handle(statusEvent2);
verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class));
Assert.assertEquals(2, node.getQueueSize());
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
Assert.assertEquals(0, node.getQueueSize());
}
@Test
public void testRunningExpire() {
RMNodeImpl node = getRunningNode();

View File

@ -30,7 +30,6 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -267,7 +266,7 @@ public class TestFairScheduler {
Assert.assertEquals(3, queueManager.getLeafQueues().size());
}
@Test
@Test (timeout = 5000)
public void testSimpleContainerAllocation() {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@ -283,8 +282,7 @@ public class TestFairScheduler {
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// Asked for less than min_allocation.
@ -292,15 +290,14 @@ public class TestFairScheduler {
scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
}
@Test
@Test (timeout = 5000)
public void testSimpleContainerReservation() throws InterruptedException {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@ -310,8 +307,7 @@ public class TestFairScheduler {
// Queue 1 requests full capacity of node
createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// Make sure queue 1 is allocated app capacity
@ -331,8 +327,7 @@ public class TestFairScheduler {
// Now another node checks in with capacity
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
scheduler.handle(updateEvent2);
@ -729,7 +724,7 @@ public class TestFairScheduler {
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
}
@Test
@Test (timeout = 5000)
public void testIsStarvedForMinShare() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@ -758,8 +753,7 @@ public class TestFairScheduler {
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
@ -788,7 +782,7 @@ public class TestFairScheduler {
}
}
@Test
@Test (timeout = 5000)
public void testIsStarvedForFairShare() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@ -817,8 +811,7 @@ public class TestFairScheduler {
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
@ -848,7 +841,7 @@ public class TestFairScheduler {
}
}
@Test
@Test (timeout = 5000)
/**
* Make sure containers are chosen to be preempted in the correct order. Right
* now this means decreasing order of priority.
@ -912,16 +905,13 @@ public class TestFairScheduler {
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < 2; i++) {
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdate1);
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeUpdate2);
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
scheduler.handle(nodeUpdate3);
}
@ -982,7 +972,7 @@ public class TestFairScheduler {
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
}
@Test
@Test (timeout = 5000)
/**
* Tests the timing of decision to preempt tasks.
*/
@ -1053,16 +1043,13 @@ public class TestFairScheduler {
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < 2; i++) {
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdate1);
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeUpdate2);
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
scheduler.handle(nodeUpdate3);
}
@ -1110,7 +1097,7 @@ public class TestFairScheduler {
Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
}
@Test
@Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@ -1120,8 +1107,7 @@ public class TestFairScheduler {
// Request full capacity of node
createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue2", "user2", 1);
@ -1137,7 +1123,7 @@ public class TestFairScheduler {
scheduler.applications.get(attId2).getCurrentReservation().getMemory());
}
@Test
@Test (timeout = 5000)
public void testUserMaxRunningApps() throws Exception {
// Set max running apps
Configuration conf = createConfiguration();
@ -1166,8 +1152,7 @@ public class TestFairScheduler {
"user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// App 1 should be running
@ -1192,7 +1177,7 @@ public class TestFairScheduler {
assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size());
}
@Test
@Test (timeout = 5000)
public void testReservationWhileMultiplePriorities() {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@ -1202,8 +1187,7 @@ public class TestFairScheduler {
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 1, 2);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSSchedulerApp app = scheduler.applications.get(attId);
@ -1276,7 +1260,7 @@ public class TestFairScheduler {
assertNull("The application was allowed", app2);
}
@Test
@Test (timeout = 5000)
public void testMultipleNodesSingleRackRequest() throws Exception {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
@ -1303,22 +1287,20 @@ public class TestFairScheduler {
// node 1 checks in
scheduler.update();
NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent1);
// should assign node local
assertEquals(1, scheduler.applications.get(appId).getLiveContainers().size());
// node 2 checks in
scheduler.update();
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(updateEvent2);
// should assign rack local
assertEquals(2, scheduler.applications.get(appId).getLiveContainers().size());
}
@Test
@Test (timeout = 5000)
public void testFifoWithinQueue() throws Exception {
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3072));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
@ -1342,8 +1324,7 @@ public class TestFairScheduler {
// Because tests set assignmultiple to false, each heartbeat assigns a single
// container.
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
assertEquals(1, app1.getLiveContainers().size());