YARN-4138. Roll back container resource allocation after resource increase token expires. Contributed by Meng Ding
This commit is contained in:
parent
145add1aec
commit
fc06ae38ca
|
@ -602,6 +602,10 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-4420. Add REST API for List Reservations. (Sean Po via curino)
|
YARN-4420. Add REST API for List Reservations. (Sean Po via curino)
|
||||||
|
|
||||||
|
YARN-4138. Roll back container resource allocation after resource
|
||||||
|
increase token expires. (Meng Ding via jianhe)
|
||||||
|
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
|
||||||
|
public class AllocationExpirationInfo implements
|
||||||
|
Comparable<AllocationExpirationInfo> {
|
||||||
|
|
||||||
|
private final ContainerId containerId;
|
||||||
|
private final boolean increase;
|
||||||
|
|
||||||
|
public AllocationExpirationInfo(ContainerId containerId) {
|
||||||
|
this(containerId, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AllocationExpirationInfo(
|
||||||
|
ContainerId containerId, boolean increase) {
|
||||||
|
this.containerId = containerId;
|
||||||
|
this.increase = increase;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContainerId getContainerId() {
|
||||||
|
return this.containerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isIncrease() {
|
||||||
|
return this.increase;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return (getContainerId().hashCode() << 16);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (!(other instanceof AllocationExpirationInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return compareTo((AllocationExpirationInfo)other) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(AllocationExpirationInfo other) {
|
||||||
|
if (other == null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
// Only need to compare containerId.
|
||||||
|
return getContainerId().compareTo(other.getContainerId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "<container=" + getContainerId() + ", increase="
|
||||||
|
+ isIncrease() + ">";
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
@ -28,7 +27,7 @@ import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public class ContainerAllocationExpirer extends
|
public class ContainerAllocationExpirer extends
|
||||||
AbstractLivelinessMonitor<ContainerId> {
|
AbstractLivelinessMonitor<AllocationExpirationInfo> {
|
||||||
|
|
||||||
private EventHandler dispatcher;
|
private EventHandler dispatcher;
|
||||||
|
|
||||||
|
@ -47,7 +46,9 @@ public class ContainerAllocationExpirer extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void expire(ContainerId containerId) {
|
protected void expire(AllocationExpirationInfo allocationExpirationInfo) {
|
||||||
dispatcher.handle(new ContainerExpiredSchedulerEvent(containerId));
|
dispatcher.handle(new ContainerExpiredSchedulerEvent(
|
||||||
|
allocationExpirationInfo.getContainerId(),
|
||||||
|
allocationExpirationInfo.isIncrease()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,8 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
|
||||||
|
|
||||||
Resource getAllocatedResource();
|
Resource getAllocatedResource();
|
||||||
|
|
||||||
|
Resource getLastConfirmedResource();
|
||||||
|
|
||||||
NodeId getAllocatedNode();
|
NodeId getAllocatedNode();
|
||||||
|
|
||||||
Priority getAllocatedPriority();
|
Priority getAllocatedPriority();
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -49,12 +50,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
|
||||||
|
.RMNodeDecreaseContainerEvent;
|
||||||
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
|
||||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||||
import org.apache.hadoop.yarn.state.StateMachine;
|
import org.apache.hadoop.yarn.state.StateMachine;
|
||||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
|
@ -119,9 +123,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
RMContainerEventType.RELEASED, new KillTransition())
|
RMContainerEventType.RELEASED, new KillTransition())
|
||||||
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
|
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
|
||||||
RMContainerEventType.RESERVED, new ContainerReservedTransition())
|
RMContainerEventType.RESERVED, new ContainerReservedTransition())
|
||||||
.addTransition(RMContainerState.RUNNING, RMContainerState.EXPIRED,
|
|
||||||
RMContainerEventType.EXPIRE,
|
|
||||||
new ContainerExpiredWhileRunningTransition())
|
|
||||||
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
|
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
|
||||||
RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
|
RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
|
||||||
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
|
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
|
||||||
|
@ -177,7 +178,10 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
private List<ResourceRequest> resourceRequests;
|
private List<ResourceRequest> resourceRequests;
|
||||||
|
|
||||||
private volatile boolean hasIncreaseReservation = false;
|
private volatile boolean hasIncreaseReservation = false;
|
||||||
|
// Only used for container resource increase and decrease. This is the
|
||||||
|
// resource to rollback to should container resource increase token expires.
|
||||||
|
private Resource lastConfirmedResource;
|
||||||
|
|
||||||
public RMContainerImpl(Container container,
|
public RMContainerImpl(Container container,
|
||||||
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
|
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
|
||||||
RMContext rmContext) {
|
RMContext rmContext) {
|
||||||
|
@ -210,6 +214,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
this.isAMContainer = false;
|
this.isAMContainer = false;
|
||||||
this.resourceRequests = null;
|
this.resourceRequests = null;
|
||||||
this.nodeLabelExpression = nodeLabelExpression;
|
this.nodeLabelExpression = nodeLabelExpression;
|
||||||
|
this.lastConfirmedResource = container.getResource();
|
||||||
|
|
||||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
this.readLock = lock.readLock();
|
this.readLock = lock.readLock();
|
||||||
|
@ -283,6 +288,16 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getLastConfirmedResource() {
|
||||||
|
try {
|
||||||
|
readLock.lock();
|
||||||
|
return this.lastConfirmedResource;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NodeId getAllocatedNode() {
|
public NodeId getAllocatedNode() {
|
||||||
return container.getNodeId();
|
return container.getNodeId();
|
||||||
|
@ -525,7 +540,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
container.setResourceRequests(null);
|
container.setResourceRequests(null);
|
||||||
|
|
||||||
// Register with containerAllocationExpirer.
|
// Register with containerAllocationExpirer.
|
||||||
container.containerAllocationExpirer.register(container.getContainerId());
|
container.containerAllocationExpirer.register(
|
||||||
|
new AllocationExpirationInfo(container.getContainerId()));
|
||||||
|
|
||||||
// Tell the app
|
// Tell the app
|
||||||
container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
|
container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
|
||||||
|
@ -543,7 +559,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
if (acquiredEvent.isIncreasedContainer()) {
|
if (acquiredEvent.isIncreasedContainer()) {
|
||||||
// If container is increased but not acquired by AM, we will start
|
// If container is increased but not acquired by AM, we will start
|
||||||
// containerAllocationExpirer for this container in this transition.
|
// containerAllocationExpirer for this container in this transition.
|
||||||
container.containerAllocationExpirer.register(event.getContainerId());
|
container.containerAllocationExpirer.register(
|
||||||
|
new AllocationExpirationInfo(event.getContainerId(), true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -553,22 +570,65 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
||||||
// Unregister the allocation expirer, it is already increased..
|
RMContainerNMDoneChangeResourceEvent nmDoneChangeResourceEvent =
|
||||||
container.containerAllocationExpirer.unregister(event.getContainerId());
|
(RMContainerNMDoneChangeResourceEvent)event;
|
||||||
}
|
Resource rmContainerResource = container.getAllocatedResource();
|
||||||
}
|
Resource nmContainerResource =
|
||||||
|
nmDoneChangeResourceEvent.getNMContainerResource();
|
||||||
private static final class ContainerExpiredWhileRunningTransition extends
|
|
||||||
BaseTransition {
|
|
||||||
|
|
||||||
@Override
|
if (Resources.equals(rmContainerResource, nmContainerResource)) {
|
||||||
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
// If rmContainerResource == nmContainerResource, the resource
|
||||||
// When the container expired, and it has a pending increased request, we
|
// increase is confirmed.
|
||||||
// will kill the container.
|
// In this case:
|
||||||
// TODO, we can do better for this: roll back container resource to the
|
// - Set the lastConfirmedResource as nmContainerResource
|
||||||
// resource before increase, and notify scheduler about this decrease as
|
// - Unregister the allocation expirer
|
||||||
// well. Will do that in a separated JIRA.
|
container.lastConfirmedResource = nmContainerResource;
|
||||||
new KillTransition().transition(container, event);
|
container.containerAllocationExpirer.unregister(
|
||||||
|
new AllocationExpirationInfo(event.getContainerId()));
|
||||||
|
} else if (Resources.fitsIn(rmContainerResource, nmContainerResource)) {
|
||||||
|
// If rmContainerResource < nmContainerResource, this is caused by the
|
||||||
|
// following sequence:
|
||||||
|
// 1. AM asks for increase from 1G to 5G, and RM approves it
|
||||||
|
// 2. AM acquires the increase token and increases on NM
|
||||||
|
// 3. Before NM reports 5G to RM to confirm the increase, AM sends
|
||||||
|
// a decrease request to 4G, and RM approves it
|
||||||
|
// 4. When NM reports 5G to RM, RM now sees its own allocation as 4G
|
||||||
|
// In this cases:
|
||||||
|
// - Set the lastConfirmedResource as rmContainerResource
|
||||||
|
// - Unregister the allocation expirer
|
||||||
|
// - Notify NM to reduce its resource to rmContainerResource
|
||||||
|
container.lastConfirmedResource = rmContainerResource;
|
||||||
|
container.containerAllocationExpirer.unregister(
|
||||||
|
new AllocationExpirationInfo(event.getContainerId()));
|
||||||
|
container.eventHandler.handle(new RMNodeDecreaseContainerEvent(
|
||||||
|
container.nodeId,
|
||||||
|
Collections.singletonList(container.getContainer())));
|
||||||
|
} else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {
|
||||||
|
// If nmContainerResource < rmContainerResource, this is caused by the
|
||||||
|
// following sequence:
|
||||||
|
// 1. AM asks for increase from 1G to 2G, and RM approves it
|
||||||
|
// 2. AM asks for increase from 2G to 4G, and RM approves it
|
||||||
|
// 3. AM only uses the 2G token to increase on NM, but never uses the
|
||||||
|
// 4G token
|
||||||
|
// 4. NM reports 2G to RM, but RM sees its own allocation as 4G
|
||||||
|
// In this case:
|
||||||
|
// - Set the lastConfirmedResource as the maximum of
|
||||||
|
// nmContainerResource and lastConfirmedResource
|
||||||
|
// - Do NOT unregister the allocation expirer
|
||||||
|
// When the increase allocation expires, resource will be rolled back to
|
||||||
|
// the last confirmed resource.
|
||||||
|
container.lastConfirmedResource = Resources.componentwiseMax(
|
||||||
|
nmContainerResource, container.lastConfirmedResource);
|
||||||
|
} else {
|
||||||
|
// Something wrong happened, kill the container
|
||||||
|
LOG.warn("Something wrong happened, container size reported by NM"
|
||||||
|
+ " is not expected, ContainerID=" + container.containerId
|
||||||
|
+ " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
|
||||||
|
+ nmContainerResource);
|
||||||
|
container.eventHandler.handle(new RMNodeCleanContainerEvent(
|
||||||
|
container.nodeId, container.containerId));
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -577,20 +637,22 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
||||||
RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
|
RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
|
||||||
|
|
||||||
// Register with containerAllocationExpirer.
|
Resource targetResource = changeEvent.getTargetResource();
|
||||||
// For now, we assume timeout for increase is as same as container
|
Resource lastConfirmedResource = container.lastConfirmedResource;
|
||||||
// allocation.
|
|
||||||
if (!changeEvent.isIncrease()) {
|
if (!changeEvent.isIncrease()) {
|
||||||
// if this is a decrease request, if container was increased but not
|
// Only unregister from the containerAllocationExpirer when target
|
||||||
// told to NM, we can consider previous increase is cancelled,
|
// resource is less than or equal to the last confirmed resource.
|
||||||
// unregister from the containerAllocationExpirer
|
if (Resources.fitsIn(targetResource, lastConfirmedResource)) {
|
||||||
container.containerAllocationExpirer.unregister(container
|
container.lastConfirmedResource = targetResource;
|
||||||
.getContainerId());
|
container.containerAllocationExpirer.unregister(
|
||||||
|
new AllocationExpirationInfo(event.getContainerId()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
container.container.setResource(changeEvent.getTargetResource());
|
container.container.setResource(targetResource);
|
||||||
|
|
||||||
// We reach here means we either allocated increase reservation OR
|
// We reach here means we either allocated increase reservation OR
|
||||||
// decreased container, reservation will be cancelled anyway.
|
// decreased container, reservation will be cancelled anyway.
|
||||||
container.hasIncreaseReservation = false;
|
container.hasIncreaseReservation = false;
|
||||||
|
@ -662,8 +724,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
|
||||||
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
||||||
|
|
||||||
// Unregister from containerAllocationExpirer.
|
// Unregister from containerAllocationExpirer.
|
||||||
container.containerAllocationExpirer.unregister(container
|
container.containerAllocationExpirer.unregister(
|
||||||
.getContainerId());
|
new AllocationExpirationInfo(container.getContainerId()));
|
||||||
|
|
||||||
// Inform node
|
// Inform node
|
||||||
container.eventHandler.handle(new RMNodeCleanContainerEvent(
|
container.eventHandler.handle(new RMNodeCleanContainerEvent(
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
|
||||||
|
public class RMContainerNMDoneChangeResourceEvent extends RMContainerEvent {
|
||||||
|
|
||||||
|
private final Resource nmContainerResource;
|
||||||
|
|
||||||
|
public RMContainerNMDoneChangeResourceEvent(
|
||||||
|
ContainerId containerId, Resource nmContainerResource) {
|
||||||
|
super(containerId, RMContainerEventType.NM_DONE_CHANGE_RESOURCE);
|
||||||
|
this.nmContainerResource = nmContainerResource;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getNMContainerResource() {
|
||||||
|
return nmContainerResource;
|
||||||
|
}
|
||||||
|
}
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
||||||
|
@ -1308,14 +1309,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
launchedContainers.add(containerId);
|
launchedContainers.add(containerId);
|
||||||
newlyLaunchedContainers.add(remoteContainer);
|
newlyLaunchedContainers.add(remoteContainer);
|
||||||
// Unregister from containerAllocationExpirer.
|
// Unregister from containerAllocationExpirer.
|
||||||
containerAllocationExpirer.unregister(containerId);
|
containerAllocationExpirer.unregister(
|
||||||
|
new AllocationExpirationInfo(containerId));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// A finished container
|
// A finished container
|
||||||
launchedContainers.remove(containerId);
|
launchedContainers.remove(containerId);
|
||||||
completedContainers.add(remoteContainer);
|
completedContainers.add(remoteContainer);
|
||||||
// Unregister from containerAllocationExpirer.
|
// Unregister from containerAllocationExpirer.
|
||||||
containerAllocationExpirer.unregister(containerId);
|
containerAllocationExpirer.unregister(
|
||||||
|
new AllocationExpirationInfo(containerId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
|
if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -68,18 +67,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
|
||||||
|
.RMContainerNMDoneChangeResourceEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
||||||
|
.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
|
||||||
|
@ -245,7 +244,7 @@ public abstract class AbstractYarnScheduler
|
||||||
application.containerLaunchedOnNode(containerId, node.getNodeID());
|
application.containerLaunchedOnNode(containerId, node.getNodeID());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void containerIncreasedOnNode(ContainerId containerId,
|
protected void containerIncreasedOnNode(ContainerId containerId,
|
||||||
SchedulerNode node, Container increasedContainerReportedByNM) {
|
SchedulerNode node, Container increasedContainerReportedByNM) {
|
||||||
// Get the application for the finished container
|
// Get the application for the finished container
|
||||||
SchedulerApplicationAttempt application =
|
SchedulerApplicationAttempt application =
|
||||||
|
@ -258,39 +257,18 @@ public abstract class AbstractYarnScheduler
|
||||||
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
LeafQueue leafQueue = (LeafQueue) application.getQueue();
|
||||||
RMContainer rmContainer = getRMContainer(containerId);
|
synchronized (leafQueue) {
|
||||||
Resource rmContainerResource = rmContainer.getAllocatedResource();
|
RMContainer rmContainer = getRMContainer(containerId);
|
||||||
Resource nmContainerResource = increasedContainerReportedByNM.getResource();
|
if (rmContainer == null) {
|
||||||
|
// Some unknown container sneaked into the system. Kill it.
|
||||||
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
if (Resources.equals(nmContainerResource, rmContainerResource)){
|
.handle(new RMNodeCleanContainerEvent(
|
||||||
// NM reported expected container size, tell RMContainer. Which will stop
|
node.getNodeID(), containerId));
|
||||||
// container expire monitor
|
return;
|
||||||
rmContainer.handle(new RMContainerEvent(containerId,
|
}
|
||||||
RMContainerEventType.NM_DONE_CHANGE_RESOURCE));
|
rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(
|
||||||
} else if (Resources.fitsIn(getResourceCalculator(), clusterResource,
|
containerId, increasedContainerReportedByNM.getResource()));
|
||||||
nmContainerResource, rmContainerResource)) {
|
|
||||||
// when rmContainerResource >= nmContainerResource, we won't do anything,
|
|
||||||
// it is possible a container increased is issued by RM, but AM hasn't
|
|
||||||
// told NM.
|
|
||||||
} else if (Resources.fitsIn(getResourceCalculator(), clusterResource,
|
|
||||||
rmContainerResource, nmContainerResource)) {
|
|
||||||
// When rmContainerResource <= nmContainerResource, it could happen when a
|
|
||||||
// container decreased by RM before it is increased in NM.
|
|
||||||
|
|
||||||
// Tell NM to decrease the container
|
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
|
||||||
.handle(new RMNodeDecreaseContainerEvent(node.getNodeID(),
|
|
||||||
Arrays.asList(rmContainer.getContainer())));
|
|
||||||
} else {
|
|
||||||
// Something wrong happened, kill the container
|
|
||||||
LOG.warn("Something wrong happened, container size reported by NM"
|
|
||||||
+ " is not expected, ContainerID=" + containerId
|
|
||||||
+ " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
|
|
||||||
+ nmContainerResource);
|
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
|
||||||
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
|
@ -1389,11 +1390,15 @@ public class CapacityScheduler extends
|
||||||
ContainerExpiredSchedulerEvent containerExpiredEvent =
|
ContainerExpiredSchedulerEvent containerExpiredEvent =
|
||||||
(ContainerExpiredSchedulerEvent) event;
|
(ContainerExpiredSchedulerEvent) event;
|
||||||
ContainerId containerId = containerExpiredEvent.getContainerId();
|
ContainerId containerId = containerExpiredEvent.getContainerId();
|
||||||
super.completedContainer(getRMContainer(containerId),
|
if (containerExpiredEvent.isIncrease()) {
|
||||||
SchedulerUtils.createAbnormalContainerStatus(
|
rollbackContainerResource(containerId);
|
||||||
containerId,
|
} else {
|
||||||
SchedulerUtils.EXPIRED_CONTAINER),
|
completedContainer(getRMContainer(containerId),
|
||||||
RMContainerEventType.EXPIRE);
|
SchedulerUtils.createAbnormalContainerStatus(
|
||||||
|
containerId,
|
||||||
|
SchedulerUtils.EXPIRED_CONTAINER),
|
||||||
|
RMContainerEventType.EXPIRE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case KILL_RESERVED_CONTAINER:
|
case KILL_RESERVED_CONTAINER:
|
||||||
|
@ -1495,7 +1500,33 @@ public class CapacityScheduler extends
|
||||||
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
|
LOG.info("Removed node " + nodeInfo.getNodeAddress() +
|
||||||
" clusterResource: " + clusterResource);
|
" clusterResource: " + clusterResource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void rollbackContainerResource(
|
||||||
|
ContainerId containerId) {
|
||||||
|
RMContainer rmContainer = getRMContainer(containerId);
|
||||||
|
if (rmContainer == null) {
|
||||||
|
LOG.info("Cannot rollback resource for container " + containerId +
|
||||||
|
". The container does not exist.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
|
||||||
|
if (application == null) {
|
||||||
|
LOG.info("Cannot rollback resource for container " + containerId +
|
||||||
|
". The application that the container belongs to does not exist.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.info("Roll back resource for container " + containerId);
|
||||||
|
LeafQueue leafQueue = (LeafQueue) application.getQueue();
|
||||||
|
synchronized(leafQueue) {
|
||||||
|
SchedulerNode schedulerNode =
|
||||||
|
getSchedulerNode(rmContainer.getAllocatedNode());
|
||||||
|
SchedContainerChangeRequest decreaseRequest =
|
||||||
|
new SchedContainerChangeRequest(this.rmContext, schedulerNode,
|
||||||
|
rmContainer, rmContainer.getLastConfirmedResource());
|
||||||
|
decreaseContainer(decreaseRequest, application);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Lock(CapacityScheduler.class)
|
@Lock(CapacityScheduler.class)
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void completedContainerInternal(
|
protected synchronized void completedContainerInternal(
|
||||||
|
|
|
@ -29,14 +29,24 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
|
||||||
public class ContainerExpiredSchedulerEvent extends SchedulerEvent {
|
public class ContainerExpiredSchedulerEvent extends SchedulerEvent {
|
||||||
|
|
||||||
private final ContainerId containerId;
|
private final ContainerId containerId;
|
||||||
|
private final boolean increase;
|
||||||
|
|
||||||
public ContainerExpiredSchedulerEvent(ContainerId containerId) {
|
public ContainerExpiredSchedulerEvent(ContainerId containerId) {
|
||||||
|
this(containerId, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContainerExpiredSchedulerEvent(
|
||||||
|
ContainerId containerId, boolean increase) {
|
||||||
super(SchedulerEventType.CONTAINER_EXPIRED);
|
super(SchedulerEventType.CONTAINER_EXPIRED);
|
||||||
this.containerId = containerId;
|
this.containerId = containerId;
|
||||||
|
this.increase = increase;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainerId getContainerId() {
|
public ContainerId getContainerId() {
|
||||||
return containerId;
|
return containerId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isIncrease() {
|
||||||
|
return increase;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -103,6 +105,17 @@ public class MockNM {
|
||||||
nodeHeartbeat(conts, true);
|
nodeHeartbeat(conts, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void containerIncreaseStatus(Container container) throws Exception {
|
||||||
|
Map<ApplicationId, List<ContainerStatus>> conts = new HashMap<>();
|
||||||
|
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
|
||||||
|
container.getId(), ContainerState.RUNNING, "Success", 0,
|
||||||
|
container.getResource());
|
||||||
|
conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
|
||||||
|
Collections.singletonList(containerStatus));
|
||||||
|
List<Container> increasedConts = Collections.singletonList(container);
|
||||||
|
nodeHeartbeat(conts, increasedConts, true, ++responseId);
|
||||||
|
}
|
||||||
|
|
||||||
public RegisterNodeManagerResponse registerNode() throws Exception {
|
public RegisterNodeManagerResponse registerNode() throws Exception {
|
||||||
return registerNode(null, null);
|
return registerNode(null, null);
|
||||||
}
|
}
|
||||||
|
@ -159,6 +172,12 @@ public class MockNM {
|
||||||
|
|
||||||
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||||
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
|
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
|
||||||
|
return nodeHeartbeat(conts, new ArrayList<Container>(), isHealthy, resId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||||
|
List<ContainerStatus>> conts, List<Container> increasedConts,
|
||||||
|
boolean isHealthy, int resId) throws Exception {
|
||||||
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
|
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
|
||||||
NodeStatus status = Records.newRecord(NodeStatus.class);
|
NodeStatus status = Records.newRecord(NodeStatus.class);
|
||||||
status.setResponseId(resId);
|
status.setResponseId(resId);
|
||||||
|
@ -167,6 +186,7 @@ public class MockNM {
|
||||||
Log.info("entry.getValue() " + entry.getValue());
|
Log.info("entry.getValue() " + entry.getValue());
|
||||||
status.setContainersStatuses(entry.getValue());
|
status.setContainersStatuses(entry.getValue());
|
||||||
}
|
}
|
||||||
|
status.setIncreasedContainers(increasedConts);
|
||||||
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
|
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
|
||||||
healthStatus.setHealthReport("");
|
healthStatus.setHealthReport("");
|
||||||
healthStatus.setIsNodeHealthy(isHealthy);
|
healthStatus.setIsNodeHealthy(isHealthy);
|
||||||
|
|
|
@ -47,6 +47,8 @@ import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
|
||||||
|
.AllocationExpirationInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
|
||||||
|
@ -949,10 +951,14 @@ public class TestRMNodeTransitions {
|
||||||
ApplicationAttemptId.newInstance(appId, 1);
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
|
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
|
||||||
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
|
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
|
||||||
mockExpirer.register(containerId1);
|
AllocationExpirationInfo expirationInfo1 =
|
||||||
mockExpirer.register(containerId2);
|
new AllocationExpirationInfo(containerId1);
|
||||||
verify(mockExpirer).register(containerId1);
|
AllocationExpirationInfo expirationInfo2 =
|
||||||
verify(mockExpirer).register(containerId2);
|
new AllocationExpirationInfo(containerId2);
|
||||||
|
mockExpirer.register(expirationInfo1);
|
||||||
|
mockExpirer.register(expirationInfo2);
|
||||||
|
verify(mockExpirer).register(expirationInfo1);
|
||||||
|
verify(mockExpirer).register(expirationInfo2);
|
||||||
((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer);
|
((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer);
|
||||||
RMNodeImpl rmNode = getRunningNode();
|
RMNodeImpl rmNode = getRunningNode();
|
||||||
ContainerStatus status1 =
|
ContainerStatus status1 =
|
||||||
|
@ -966,7 +972,7 @@ public class TestRMNodeTransitions {
|
||||||
statusList.add(status2);
|
statusList.add(status2);
|
||||||
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList);
|
RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList);
|
||||||
rmNode.handle(statusEvent);
|
rmNode.handle(statusEvent);
|
||||||
verify(mockExpirer).unregister(containerId1);
|
verify(mockExpirer).unregister(expirationInfo1);
|
||||||
verify(mockExpirer).unregister(containerId2);
|
verify(mockExpirer).unregister(expirationInfo2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
|
@ -249,119 +250,12 @@ public class TestRMContainerImpl {
|
||||||
rmContainer.handle(new RMContainerFinishedEvent(containerId,
|
rmContainer.handle(new RMContainerFinishedEvent(containerId,
|
||||||
containerStatus, RMContainerEventType.EXPIRE));
|
containerStatus, RMContainerEventType.EXPIRE));
|
||||||
drainDispatcher.await();
|
drainDispatcher.await();
|
||||||
assertEquals(RMContainerState.EXPIRED, rmContainer.getState());
|
|
||||||
verify(writer, times(1)).containerFinished(any(RMContainer.class));
|
|
||||||
verify(publisher, times(1)).containerFinished(any(RMContainer.class),
|
|
||||||
anyLong());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testExpireAfterIncreased(boolean acquired) {
|
|
||||||
/*
|
|
||||||
* Similar to previous test, a container is increased but not acquired by
|
|
||||||
* AM. In this case, if a container is expired, the container should be
|
|
||||||
* finished.
|
|
||||||
*/
|
|
||||||
DrainDispatcher drainDispatcher = new DrainDispatcher();
|
|
||||||
EventHandler<RMAppAttemptEvent> appAttemptEventHandler =
|
|
||||||
mock(EventHandler.class);
|
|
||||||
EventHandler generic = mock(EventHandler.class);
|
|
||||||
drainDispatcher.register(RMAppAttemptEventType.class,
|
|
||||||
appAttemptEventHandler);
|
|
||||||
drainDispatcher.register(RMNodeEventType.class, generic);
|
|
||||||
drainDispatcher.init(new YarnConfiguration());
|
|
||||||
drainDispatcher.start();
|
|
||||||
NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
|
|
||||||
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
|
||||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
|
||||||
appId, 1);
|
|
||||||
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
|
|
||||||
ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
|
|
||||||
|
|
||||||
Resource resource = BuilderUtils.newResource(512, 1);
|
|
||||||
Priority priority = BuilderUtils.newPriority(5);
|
|
||||||
|
|
||||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
|
||||||
"host:3465", resource, priority, null);
|
|
||||||
|
|
||||||
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
|
|
||||||
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
|
|
||||||
RMContext rmContext = mock(RMContext.class);
|
|
||||||
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
|
|
||||||
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
|
|
||||||
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
|
|
||||||
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
|
|
||||||
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
|
|
||||||
ConcurrentMap<ApplicationId, RMApp> apps =
|
|
||||||
new ConcurrentHashMap<ApplicationId, RMApp>();
|
|
||||||
apps.put(appId, mock(RMApp.class));
|
|
||||||
when(rmContext.getRMApps()).thenReturn(apps);
|
|
||||||
RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
|
|
||||||
nodeId, "user", rmContext);
|
|
||||||
|
|
||||||
assertEquals(RMContainerState.NEW, rmContainer.getState());
|
|
||||||
assertEquals(resource, rmContainer.getAllocatedResource());
|
|
||||||
assertEquals(nodeId, rmContainer.getAllocatedNode());
|
|
||||||
assertEquals(priority, rmContainer.getAllocatedPriority());
|
|
||||||
verify(writer).containerStarted(any(RMContainer.class));
|
|
||||||
verify(publisher).containerCreated(any(RMContainer.class), anyLong());
|
|
||||||
|
|
||||||
rmContainer.handle(new RMContainerEvent(containerId,
|
|
||||||
RMContainerEventType.START));
|
|
||||||
drainDispatcher.await();
|
|
||||||
assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
|
|
||||||
|
|
||||||
rmContainer.handle(new RMContainerEvent(containerId,
|
|
||||||
RMContainerEventType.ACQUIRED));
|
|
||||||
drainDispatcher.await();
|
|
||||||
assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
|
|
||||||
|
|
||||||
rmContainer.handle(new RMContainerEvent(containerId,
|
|
||||||
RMContainerEventType.LAUNCHED));
|
|
||||||
drainDispatcher.await();
|
|
||||||
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
|
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
|
||||||
assertEquals(
|
verify(writer, never()).containerFinished(any(RMContainer.class));
|
||||||
"http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
|
verify(publisher, never()).containerFinished(any(RMContainer.class),
|
||||||
rmContainer.getLogURL());
|
|
||||||
|
|
||||||
// newResource is more than the old resource
|
|
||||||
Resource newResource = BuilderUtils.newResource(1024, 2);
|
|
||||||
rmContainer.handle(new RMContainerChangeResourceEvent(containerId,
|
|
||||||
newResource, true));
|
|
||||||
|
|
||||||
if (acquired) {
|
|
||||||
rmContainer
|
|
||||||
.handle(new RMContainerUpdatesAcquiredEvent(containerId, true));
|
|
||||||
drainDispatcher.await();
|
|
||||||
// status is still RUNNING since this is a increased container acquired by
|
|
||||||
// AM
|
|
||||||
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
|
|
||||||
}
|
|
||||||
|
|
||||||
// In RUNNING state. Verify EXPIRE and associated actions.
|
|
||||||
reset(appAttemptEventHandler);
|
|
||||||
ContainerStatus containerStatus = SchedulerUtils
|
|
||||||
.createAbnormalContainerStatus(containerId,
|
|
||||||
SchedulerUtils.EXPIRED_CONTAINER);
|
|
||||||
rmContainer.handle(new RMContainerFinishedEvent(containerId,
|
|
||||||
containerStatus, RMContainerEventType.EXPIRE));
|
|
||||||
drainDispatcher.await();
|
|
||||||
assertEquals(RMContainerState.EXPIRED, rmContainer.getState());
|
|
||||||
|
|
||||||
// Container will be finished only when it is acquired by AM after increase,
|
|
||||||
// we will only notify expirer when it is acquired by AM.
|
|
||||||
verify(writer, times(1)).containerFinished(any(RMContainer.class));
|
|
||||||
verify(publisher, times(1)).containerFinished(any(RMContainer.class),
|
|
||||||
anyLong());
|
anyLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testExpireAfterContainerResourceIncreased() throws Exception {
|
|
||||||
// expire after increased and acquired by AM
|
|
||||||
testExpireAfterIncreased(true);
|
|
||||||
// expire after increased but not acquired by AM
|
|
||||||
testExpireAfterIncreased(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExistenceOfResourceRequestInRMContainer() throws Exception {
|
public void testExistenceOfResourceRequestInRMContainer() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
||||||
|
@ -143,7 +142,8 @@ public class TestContainerResizing {
|
||||||
.newInstance(containerId1, Resources.createResource(3 * GB))),
|
.newInstance(containerId1, Resources.createResource(3 * GB))),
|
||||||
null);
|
null);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
|
||||||
checkPendingResource(rm1, "default", 2 * GB, null);
|
checkPendingResource(rm1, "default", 2 * GB, null);
|
||||||
Assert.assertEquals(2 * GB,
|
Assert.assertEquals(2 * GB,
|
||||||
|
@ -183,7 +183,8 @@ public class TestContainerResizing {
|
||||||
// app1 -> a1
|
// app1 -> a1
|
||||||
RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
|
||||||
checkUsedResource(rm1, "default", 3 * GB, null);
|
checkUsedResource(rm1, "default", 3 * GB, null);
|
||||||
Assert.assertEquals(3 * GB,
|
Assert.assertEquals(3 * GB,
|
||||||
|
@ -242,7 +243,8 @@ public class TestContainerResizing {
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
|
||||||
// Allocate two more containers
|
// Allocate two more containers
|
||||||
am1.allocate(
|
am1.allocate(
|
||||||
|
@ -346,7 +348,8 @@ public class TestContainerResizing {
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
|
||||||
// Allocate 1 container
|
// Allocate 1 container
|
||||||
am1.allocate(
|
am1.allocate(
|
||||||
|
@ -422,7 +425,8 @@ public class TestContainerResizing {
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
|
||||||
// Allocate two more containers
|
// Allocate two more containers
|
||||||
am1.allocate(
|
am1.allocate(
|
||||||
|
@ -532,7 +536,8 @@ public class TestContainerResizing {
|
||||||
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
|
||||||
// Allocate two more containers
|
// Allocate two more containers
|
||||||
am1.allocate(
|
am1.allocate(
|
||||||
|
@ -593,8 +598,8 @@ public class TestContainerResizing {
|
||||||
am1.allocate(null, Arrays.asList(containerId2));
|
am1.allocate(null, Arrays.asList(containerId2));
|
||||||
// am1 asks to change its AM container from 2G to 1G (decrease)
|
// am1 asks to change its AM container from 2G to 1G (decrease)
|
||||||
am1.sendContainerResizingRequest(null, Arrays.asList(
|
am1.sendContainerResizingRequest(null, Arrays.asList(
|
||||||
ContainerResourceChangeRequest
|
ContainerResourceChangeRequest
|
||||||
.newInstance(containerId1, Resources.createResource(1 * GB))));
|
.newInstance(containerId1, Resources.createResource(1 * GB))));
|
||||||
// Trigger a node heartbeat..
|
// Trigger a node heartbeat..
|
||||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
|
@ -643,7 +648,8 @@ public class TestContainerResizing {
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
|
||||||
// Allocate two more containers
|
// Allocate two more containers
|
||||||
am1.allocate(
|
am1.allocate(
|
||||||
|
@ -740,7 +746,8 @@ public class TestContainerResizing {
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
|
||||||
// Allocate two more containers
|
// Allocate two more containers
|
||||||
am1.allocate(
|
am1.allocate(
|
||||||
|
@ -862,7 +869,8 @@ public class TestContainerResizing {
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
|
ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
|
||||||
|
|
||||||
// Container 2, 3 (priority=3)
|
// Container 2, 3 (priority=3)
|
||||||
|
@ -942,7 +950,8 @@ public class TestContainerResizing {
|
||||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
|
ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
|
||||||
|
|
||||||
// Container 2, 3 (priority=3)
|
// Container 2, 3 (priority=3)
|
||||||
|
@ -1021,7 +1030,8 @@ public class TestContainerResizing {
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
|
||||||
// making sure resource is allocated
|
// making sure resource is allocated
|
||||||
checkUsedResource(rm, "default", 3 * GB, null);
|
checkUsedResource(rm, "default", 3 * GB, null);
|
||||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm, app1.getApplicationId());
|
||||||
Assert.assertEquals(3 * GB,
|
Assert.assertEquals(3 * GB,
|
||||||
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||||
// making sure container is launched
|
// making sure container is launched
|
||||||
|
@ -1113,10 +1123,4 @@ public class TestContainerResizing {
|
||||||
Assert
|
Assert
|
||||||
.assertEquals(expectedMemory, node.getAvailableResource().getMemory());
|
.assertEquals(expectedMemory, node.getAvailableResource().getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
|
|
||||||
ApplicationId appId) {
|
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
||||||
return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,443 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class TestIncreaseAllocationExpirer {
|
||||||
|
private final int GB = 1024;
|
||||||
|
private YarnConfiguration conf;
|
||||||
|
RMNodeLabelsManager mgr;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
mgr = new NullRMNodeLabelsManager();
|
||||||
|
mgr.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerIsRemovedFromAllocationExpirer()
|
||||||
|
throws Exception {
|
||||||
|
/**
|
||||||
|
* 1. Allocate 1 container: containerId2 (1G)
|
||||||
|
* 2. Increase resource of containerId2: 1G -> 3G
|
||||||
|
* 3. AM acquires the token
|
||||||
|
* 4. AM uses the token
|
||||||
|
* 5. Verify containerId2 is removed from allocation expirer such
|
||||||
|
* that it still runs fine after allocation expiration interval
|
||||||
|
*/
|
||||||
|
// Set the allocation expiration to 5 seconds
|
||||||
|
conf.setLong(
|
||||||
|
YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
// Submit an application
|
||||||
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
// Report AM container status RUNNING to remove it from expirer
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
app1.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId(), 1, ContainerState.RUNNING);
|
||||||
|
// AM request a new container
|
||||||
|
am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
// AM acquire a new container to start container allocation expirer
|
||||||
|
List<Container> containers =
|
||||||
|
am1.allocate(null, null).getAllocatedContainers();
|
||||||
|
Assert.assertEquals(containerId2, containers.get(0).getId());
|
||||||
|
Assert.assertNotNull(containers.get(0).getContainerToken());
|
||||||
|
checkUsedResource(rm1, "default", 2 * GB, null);
|
||||||
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
Assert.assertEquals(2 * GB,
|
||||||
|
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||||
|
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB);
|
||||||
|
// Report container status
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
app1.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId(), 2, ContainerState.RUNNING);
|
||||||
|
// Wait until container status is RUNNING, and is removed from
|
||||||
|
// allocation expirer
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||||
|
// am1 asks to increase containerId2 from 1GB to 3GB
|
||||||
|
am1.sendContainerResizingRequest(Collections.singletonList(
|
||||||
|
ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId2, Resources.createResource(3 * GB))), null);
|
||||||
|
// Kick off scheduling and sleep for 1 second;
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
// Start container increase allocation expirer"
|
||||||
|
am1.allocate(null, null);
|
||||||
|
// Remember the resource in order to report status
|
||||||
|
Resource resource = Resources.clone(
|
||||||
|
rm1.getResourceScheduler().getRMContainer(containerId2)
|
||||||
|
.getAllocatedResource());
|
||||||
|
nm1.containerIncreaseStatus(getContainer(rm1, containerId2, resource));
|
||||||
|
// Wait long enough and verify that the container was removed
|
||||||
|
// from allocation expirer, and the container is still running
|
||||||
|
Thread.sleep(10000);
|
||||||
|
Assert.assertEquals(RMContainerState.RUNNING,
|
||||||
|
rm1.getResourceScheduler().getRMContainer(containerId2).getState());
|
||||||
|
// Verify container size is 3G
|
||||||
|
Assert.assertEquals(
|
||||||
|
3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
|
||||||
|
.getAllocatedResource().getMemory());
|
||||||
|
// Verify total resource usage
|
||||||
|
checkUsedResource(rm1, "default", 4 * GB, null);
|
||||||
|
Assert.assertEquals(4 * GB,
|
||||||
|
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||||
|
// Verify available resource
|
||||||
|
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerIncreaseAllocationExpiration()
|
||||||
|
throws Exception {
|
||||||
|
/**
|
||||||
|
* 1. Allocate 1 container: containerId2 (1G)
|
||||||
|
* 2. Increase resource of containerId2: 1G -> 3G
|
||||||
|
* 3. AM acquires the token
|
||||||
|
* 4. AM does not use the token
|
||||||
|
* 5. Verify containerId2's resource usage falls back to
|
||||||
|
* 1G after the increase token expires
|
||||||
|
*/
|
||||||
|
// Set the allocation expiration to 5 seconds
|
||||||
|
conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
app1.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId(), 1, ContainerState.RUNNING);
|
||||||
|
am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
List<Container> containers =
|
||||||
|
am1.allocate(null, null).getAllocatedContainers();
|
||||||
|
Assert.assertEquals(containerId2, containers.get(0).getId());
|
||||||
|
Assert.assertNotNull(containers.get(0).getContainerToken());
|
||||||
|
checkUsedResource(rm1, "default", 2 * GB, null);
|
||||||
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
Assert.assertEquals(2 * GB,
|
||||||
|
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||||
|
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB);
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
app1.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId(), 2, ContainerState.RUNNING);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||||
|
// am1 asks to increase containerId2 from 1GB to 3GB
|
||||||
|
am1.sendContainerResizingRequest(Collections.singletonList(
|
||||||
|
ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId2, Resources.createResource(3 * GB))), null);
|
||||||
|
// Kick off scheduling and wait for 1 second;
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
// Start container increase allocation expirer
|
||||||
|
am1.allocate(null, null);
|
||||||
|
// Verify resource usage
|
||||||
|
checkUsedResource(rm1, "default", 4 * GB, null);
|
||||||
|
Assert.assertEquals(4 * GB,
|
||||||
|
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||||
|
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
|
||||||
|
// Wait long enough for the increase token to expire, and for the roll
|
||||||
|
// back action to complete
|
||||||
|
Thread.sleep(10000);
|
||||||
|
// Verify container size is 1G
|
||||||
|
Assert.assertEquals(
|
||||||
|
1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
|
||||||
|
.getAllocatedResource().getMemory());
|
||||||
|
// Verify total resource usage is 2G
|
||||||
|
checkUsedResource(rm1, "default", 2 * GB, null);
|
||||||
|
Assert.assertEquals(2 * GB,
|
||||||
|
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||||
|
// Verify available resource is rolled back to 18GB
|
||||||
|
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 18 * GB);
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConsecutiveContainerIncreaseAllocationExpiration()
|
||||||
|
throws Exception {
|
||||||
|
/**
|
||||||
|
* 1. Allocate 1 container: containerId2 (1G)
|
||||||
|
* 2. Increase resource of containerId2: 1G -> 3G
|
||||||
|
* 3. AM acquires the token
|
||||||
|
* 4. Increase resource of containerId2 again: 3G -> 5G
|
||||||
|
* 5. AM acquires the token
|
||||||
|
* 6. AM uses the first token to increase the container in NM to 3G
|
||||||
|
* 7. AM NEVER uses the second token
|
||||||
|
* 8. Verify containerId2 eventually is allocated 3G after token expires
|
||||||
|
* 9. Verify NM eventually uses 3G for containerId2
|
||||||
|
*/
|
||||||
|
// Set the allocation expiration to 5 seconds
|
||||||
|
conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
// Submit an application
|
||||||
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
app1.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId(), 1, ContainerState.RUNNING);
|
||||||
|
// AM request a new container
|
||||||
|
am1.allocate("127.0.0.1", 1 * GB, 1, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
// AM acquire a new container to start container allocation expirer
|
||||||
|
am1.allocate(null, null).getAllocatedContainers();
|
||||||
|
// Report container status
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
app1.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId(), 2, ContainerState.RUNNING);
|
||||||
|
// Wait until container status is RUNNING, and is removed from
|
||||||
|
// allocation expirer
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||||
|
// am1 asks to change containerId2 from 1GB to 3GB
|
||||||
|
am1.sendContainerResizingRequest(Collections.singletonList(
|
||||||
|
ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId2, Resources.createResource(3 * GB))), null);
|
||||||
|
// Kick off scheduling and sleep for 1 second to
|
||||||
|
// make sure the allocation is done
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
// Start container increase allocation expirer
|
||||||
|
am1.allocate(null, null);
|
||||||
|
// Remember the resource (3G) in order to report status
|
||||||
|
Resource resource1 = Resources.clone(
|
||||||
|
rm1.getResourceScheduler().getRMContainer(containerId2)
|
||||||
|
.getAllocatedResource());
|
||||||
|
// am1 asks to change containerId2 from 3GB to 5GB
|
||||||
|
am1.sendContainerResizingRequest(Collections.singletonList(
|
||||||
|
ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId2, Resources.createResource(5 * GB))), null);
|
||||||
|
// Kick off scheduling and sleep for 1 second to
|
||||||
|
// make sure the allocation is done
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
// Reset container increase allocation expirer
|
||||||
|
am1.allocate(null, null);
|
||||||
|
// Verify current resource allocation in RM
|
||||||
|
checkUsedResource(rm1, "default", 6 * GB, null);
|
||||||
|
FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
|
||||||
|
rm1, app1.getApplicationId());
|
||||||
|
Assert.assertEquals(6 * GB,
|
||||||
|
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||||
|
// Verify available resource is now reduced to 14GB
|
||||||
|
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 14 * GB);
|
||||||
|
// Use the first token (3G)
|
||||||
|
nm1.containerIncreaseStatus(getContainer(rm1, containerId2, resource1));
|
||||||
|
// Wait long enough for the second token (5G) to expire, and verify that
|
||||||
|
// the roll back action is completed as expected
|
||||||
|
Thread.sleep(10000);
|
||||||
|
// Verify container size is rolled back to 3G
|
||||||
|
Assert.assertEquals(
|
||||||
|
3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
|
||||||
|
.getAllocatedResource().getMemory());
|
||||||
|
// Verify total resource usage is 4G
|
||||||
|
checkUsedResource(rm1, "default", 4 * GB, null);
|
||||||
|
Assert.assertEquals(4 * GB,
|
||||||
|
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||||
|
// Verify available resource is rolled back to 14GB
|
||||||
|
verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
|
||||||
|
// Verify NM receives the decrease message (3G)
|
||||||
|
List<Container> containersToDecrease =
|
||||||
|
nm1.nodeHeartbeat(true).getContainersToDecrease();
|
||||||
|
Assert.assertEquals(1, containersToDecrease.size());
|
||||||
|
Assert.assertEquals(
|
||||||
|
3 * GB, containersToDecrease.get(0).getResource().getMemory());
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDecreaseAfterIncreaseWithAllocationExpiration()
|
||||||
|
throws Exception {
|
||||||
|
/**
|
||||||
|
* 1. Allocate three containers: containerId2, containerId3, containerId4
|
||||||
|
* 2. Increase resource of containerId2: 3G -> 6G
|
||||||
|
* 3. Increase resource of containerId3: 3G -> 6G
|
||||||
|
* 4. Increase resource of containerId4: 3G -> 6G
|
||||||
|
* 5. Do NOT use the increase tokens for containerId2 and containerId3
|
||||||
|
* 6. Decrease containerId2: 6G -> 2G (i.e., below last confirmed resource)
|
||||||
|
* 7. Decrease containerId3: 6G -> 4G (i.e., above last confirmed resource)
|
||||||
|
* 8. Decrease containerId4: 6G -> 4G (i.e., above last confirmed resource)
|
||||||
|
* 9. Use token for containerId4 to increase containerId4 on NM to 6G
|
||||||
|
* 10. Verify containerId2 eventually uses 2G (removed from expirer)
|
||||||
|
* 11. verify containerId3 eventually uses 3G (increase token expires)
|
||||||
|
* 12. Verify containerId4 eventually uses 4G (removed from expirer)
|
||||||
|
* 13. Verify NM evetually uses 3G for containerId3, 4G for containerId4
|
||||||
|
*/
|
||||||
|
// Set the allocation expiration to 5 seconds
|
||||||
|
conf.setLong(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, 5000);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
// Submit an application
|
||||||
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 20 * GB);
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
nm1.nodeHeartbeat(
|
||||||
|
app1.getCurrentAppAttempt()
|
||||||
|
.getAppAttemptId(), 1, ContainerState.RUNNING);
|
||||||
|
// AM request two new continers
|
||||||
|
am1.allocate("127.0.0.1", 3 * GB, 3, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
ContainerId containerId3 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
||||||
|
rm1.waitForState(nm1, containerId3, RMContainerState.ALLOCATED);
|
||||||
|
ContainerId containerId4 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
|
||||||
|
rm1.waitForState(nm1, containerId4, RMContainerState.ALLOCATED);
|
||||||
|
// AM acquires tokens to start container allocation expirer
|
||||||
|
List<Container> containers =
|
||||||
|
am1.allocate(null, null).getAllocatedContainers();
|
||||||
|
Assert.assertEquals(3, containers.size());
|
||||||
|
Assert.assertNotNull(containers.get(0).getContainerToken());
|
||||||
|
Assert.assertNotNull(containers.get(1).getContainerToken());
|
||||||
|
Assert.assertNotNull(containers.get(2).getContainerToken());
|
||||||
|
// Report container status
|
||||||
|
nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
2, ContainerState.RUNNING);
|
||||||
|
nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
3, ContainerState.RUNNING);
|
||||||
|
nm1.nodeHeartbeat(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
4, ContainerState.RUNNING);
|
||||||
|
// Wait until container status becomes RUNNING
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||||
|
rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
|
||||||
|
rm1.waitForState(nm1, containerId4, RMContainerState.RUNNING);
|
||||||
|
// am1 asks to change containerId2 and containerId3 from 1GB to 3GB
|
||||||
|
List<ContainerResourceChangeRequest> increaseRequests = new ArrayList<>();
|
||||||
|
increaseRequests.add(ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId2, Resources.createResource(6 * GB)));
|
||||||
|
increaseRequests.add(ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId3, Resources.createResource(6 * GB)));
|
||||||
|
increaseRequests.add(ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId4, Resources.createResource(6 * GB)));
|
||||||
|
am1.sendContainerResizingRequest(increaseRequests, null);
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
// Start container increase allocation expirer
|
||||||
|
am1.allocate(null, null);
|
||||||
|
// Decrease containers
|
||||||
|
List<ContainerResourceChangeRequest> decreaseRequests = new ArrayList<>();
|
||||||
|
decreaseRequests.add(ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId2, Resources.createResource(2 * GB)));
|
||||||
|
decreaseRequests.add(ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId3, Resources.createResource(4 * GB)));
|
||||||
|
decreaseRequests.add(ContainerResourceChangeRequest.newInstance(
|
||||||
|
containerId4, Resources.createResource(4 * GB)));
|
||||||
|
AllocateResponse response =
|
||||||
|
am1.sendContainerResizingRequest(null, decreaseRequests);
|
||||||
|
// Verify containers are decreased in scheduler
|
||||||
|
Assert.assertEquals(3, response.getDecreasedContainers().size());
|
||||||
|
// Use the token for containerId4 on NM (6G). This should set the last
|
||||||
|
// confirmed resource to 4G, and cancel the allocation expirer
|
||||||
|
nm1.containerIncreaseStatus(getContainer(
|
||||||
|
rm1, containerId4, Resources.createResource(6 * GB)));
|
||||||
|
// Wait for containerId3 token to expire,
|
||||||
|
Thread.sleep(10000);
|
||||||
|
Assert.assertEquals(
|
||||||
|
2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
|
||||||
|
.getAllocatedResource().getMemory());
|
||||||
|
Assert.assertEquals(
|
||||||
|
3 * GB, rm1.getResourceScheduler().getRMContainer(containerId3)
|
||||||
|
.getAllocatedResource().getMemory());
|
||||||
|
Assert.assertEquals(
|
||||||
|
4 * GB, rm1.getResourceScheduler().getRMContainer(containerId4)
|
||||||
|
.getAllocatedResource().getMemory());
|
||||||
|
// Verify NM receives 2 decrease message
|
||||||
|
List<Container> containersToDecrease =
|
||||||
|
nm1.nodeHeartbeat(true).getContainersToDecrease();
|
||||||
|
Assert.assertEquals(2, containersToDecrease.size());
|
||||||
|
// Sort the list to make sure containerId3 is the first
|
||||||
|
Collections.sort(containersToDecrease);
|
||||||
|
Assert.assertEquals(
|
||||||
|
3 * GB, containersToDecrease.get(0).getResource().getMemory());
|
||||||
|
Assert.assertEquals(
|
||||||
|
4 * GB, containersToDecrease.get(1).getResource().getMemory());
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkUsedResource(MockRM rm, String queueName, int memory,
|
||||||
|
String label) {
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
CSQueue queue = cs.getQueue(queueName);
|
||||||
|
Assert.assertEquals(memory,
|
||||||
|
queue.getQueueResourceUsage()
|
||||||
|
.getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label)
|
||||||
|
.getMemory());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId,
|
||||||
|
int expectedMemory) {
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
SchedulerNode node = cs.getNode(nodeId);
|
||||||
|
Assert
|
||||||
|
.assertEquals(expectedMemory, node.getAvailableResource().getMemory());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Container getContainer(
|
||||||
|
MockRM rm, ContainerId containerId, Resource resource) {
|
||||||
|
RMContainer rmContainer = rm.getResourceScheduler()
|
||||||
|
.getRMContainer(containerId);
|
||||||
|
return Container.newInstance(
|
||||||
|
containerId, rmContainer.getAllocatedNode(), null,
|
||||||
|
resource, null, null);
|
||||||
|
}
|
||||||
|
}
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||||
|
@ -91,10 +92,10 @@ public class TestUtils {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// No op
|
// No op
|
||||||
ContainerAllocationExpirer cae =
|
ContainerAllocationExpirer cae =
|
||||||
new ContainerAllocationExpirer(nullDispatcher);
|
new ContainerAllocationExpirer(nullDispatcher);
|
||||||
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
|
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
|
||||||
RMContextImpl rmContext =
|
RMContextImpl rmContext =
|
||||||
|
@ -122,7 +123,7 @@ public class TestUtils {
|
||||||
return (Resource) args[1];
|
return (Resource) args[1];
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
rmContext.setNodeLabelManager(nlm);
|
rmContext.setNodeLabelManager(nlm);
|
||||||
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
|
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
|
||||||
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
|
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
|
||||||
|
@ -349,4 +350,10 @@ public class TestUtils {
|
||||||
conf.setDefaultNodeLabelExpression(B, "y");
|
conf.setDefaultNodeLabelExpression(B, "y");
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
|
||||||
|
ApplicationId appId) {
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue