YARN-3354. Add node label expression in ContainerTokenIdentifier to support RM recovery. Contributed by Wangda Tan

(cherry picked from commit 1b89a3e173)
This commit is contained in:
Jian He 2015-04-15 13:57:06 -07:00
parent 58f99b740a
commit 6fed2c2a79
17 changed files with 408 additions and 27 deletions

View File

@ -33,6 +33,9 @@ Release 2.8.0 - UNRELEASED
YARN-3326. Support RESTful API for getLabelsToNodes. (Naganarasimha G R YARN-3326. Support RESTful API for getLabelsToNodes. (Naganarasimha G R
via ozawa) via ozawa)
YARN-3354. Add node label expression in ContainerTokenIdentifier to support
RM recovery. (Wangda Tan via jianhe)
IMPROVEMENTS IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
@ -64,13 +65,14 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
String hostName, String appSubmitter, Resource r, long expiryTimeStamp, String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime) { int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
rmIdentifier, priority, creationTime, null); rmIdentifier, priority, creationTime, null,
CommonNodeLabelsManager.NO_LABEL);
} }
public ContainerTokenIdentifier(ContainerId containerID, String hostName, public ContainerTokenIdentifier(ContainerId containerID, String hostName,
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext) { LogAggregationContext logAggregationContext, String nodeLabelExpression) {
ContainerTokenIdentifierProto.Builder builder = ContainerTokenIdentifierProto.Builder builder =
ContainerTokenIdentifierProto.newBuilder(); ContainerTokenIdentifierProto.newBuilder();
if (containerID != null) { if (containerID != null) {
@ -93,6 +95,11 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
builder.setLogAggregationContext( builder.setLogAggregationContext(
((LogAggregationContextPBImpl)logAggregationContext).getProto()); ((LogAggregationContextPBImpl)logAggregationContext).getProto());
} }
if (nodeLabelExpression != null) {
builder.setNodeLabelExpression(nodeLabelExpression);
}
proto = builder.build(); proto = builder.build();
} }
@ -187,6 +194,16 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
containerId); containerId);
} }
/**
* Get the node-label-expression in the original ResourceRequest
*/
public String getNodeLabelExpression() {
if (proto.hasNodeLabelExpression()) {
return proto.getNodeLabelExpression();
}
return CommonNodeLabelsManager.NO_LABEL;
}
// TODO: Needed? // TODO: Needed?
@InterfaceAudience.Private @InterfaceAudience.Private
public static class Renewer extends Token.TrivialRenewer { public static class Renewer extends Token.TrivialRenewer {

View File

@ -49,6 +49,7 @@ message ContainerTokenIdentifierProto {
optional PriorityProto priority = 8; optional PriorityProto priority = 8;
optional int64 creationTime = 9; optional int64 creationTime = 9;
optional LogAggregationContextProto logAggregationContext = 10; optional LogAggregationContextProto logAggregationContext = 10;
optional string nodeLabelExpression = 11;
} }
message ClientToAMTokenIdentifierProto { message ClientToAMTokenIdentifierProto {

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -32,10 +33,20 @@ import org.apache.hadoop.yarn.util.Records;
*/ */
public abstract class NMContainerStatus { public abstract class NMContainerStatus {
// Used by tests only
public static NMContainerStatus newInstance(ContainerId containerId, public static NMContainerStatus newInstance(ContainerId containerId,
ContainerState containerState, Resource allocatedResource, ContainerState containerState, Resource allocatedResource,
String diagnostics, int containerExitStatus, Priority priority, String diagnostics, int containerExitStatus, Priority priority,
long creationTime) { long creationTime) {
return newInstance(containerId, containerState, allocatedResource,
diagnostics, containerExitStatus, priority, creationTime,
CommonNodeLabelsManager.NO_LABEL);
}
public static NMContainerStatus newInstance(ContainerId containerId,
ContainerState containerState, Resource allocatedResource,
String diagnostics, int containerExitStatus, Priority priority,
long creationTime, String nodeLabelExpression) {
NMContainerStatus status = NMContainerStatus status =
Records.newRecord(NMContainerStatus.class); Records.newRecord(NMContainerStatus.class);
status.setContainerId(containerId); status.setContainerId(containerId);
@ -45,6 +56,7 @@ public abstract class NMContainerStatus {
status.setContainerExitStatus(containerExitStatus); status.setContainerExitStatus(containerExitStatus);
status.setPriority(priority); status.setPriority(priority);
status.setCreationTime(creationTime); status.setCreationTime(creationTime);
status.setNodeLabelExpression(nodeLabelExpression);
return status; return status;
} }
@ -105,4 +117,12 @@ public abstract class NMContainerStatus {
public abstract long getCreationTime(); public abstract long getCreationTime();
public abstract void setCreationTime(long creationTime); public abstract void setCreationTime(long creationTime);
/**
* Get the node-label-expression in the original ResourceRequest
*/
public abstract String getNodeLabelExpression();
public abstract void setNodeLabelExpression(
String nodeLabelExpression);
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@ -208,6 +209,25 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
builder.setCreationTime(creationTime); builder.setCreationTime(creationTime);
} }
@Override
public String getNodeLabelExpression() {
NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNodeLabelExpression()) {
return p.getNodeLabelExpression();
}
return CommonNodeLabelsManager.NO_LABEL;
}
@Override
public void setNodeLabelExpression(String nodeLabelExpression) {
maybeInitBuilder();
if (nodeLabelExpression == null) {
builder.clearNodeLabelExpression();
return;
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
private void mergeLocalToBuilder() { private void mergeLocalToBuilder() {
if (this.containerId != null if (this.containerId != null
&& !((ContainerIdPBImpl) containerId).getProto().equals( && !((ContainerIdPBImpl) containerId).getProto().equals(
@ -274,5 +294,4 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
private PriorityProto convertToProtoFormat(Priority t) { private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto(); return ((PriorityPBImpl)t).getProto();
} }
} }

View File

@ -92,6 +92,7 @@ message NMContainerStatusProto {
optional string diagnostics = 5 [default = "N/A"]; optional string diagnostics = 5 [default = "N/A"];
optional int32 container_exit_status = 6; optional int32 container_exit_status = 6;
optional int64 creation_time = 7; optional int64 creation_time = 7;
optional string nodeLabelExpression = 8;
} }
message SCMUploaderNotifyRequestProto { message SCMUploaderNotifyRequestProto {

View File

@ -434,7 +434,8 @@ public class ContainerImpl implements Container {
return NMContainerStatus.newInstance(this.containerId, getCurrentState(), return NMContainerStatus.newInstance(this.containerId, getCurrentState(),
getResource(), diagnostics.toString(), exitCode, getResource(), diagnostics.toString(), exitCode,
containerTokenIdentifier.getPriority(), containerTokenIdentifier.getPriority(),
containerTokenIdentifier.getCreationTime()); containerTokenIdentifier.getCreationTime(),
containerTokenIdentifier.getNodeLabelExpression());
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }

View File

@ -809,7 +809,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerTokenIdentifier containerTokenIdentifier = ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), user, r, new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
System.currentTimeMillis() + 100000L, 123, rmIdentifier, System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext); Priority.newInstance(0), 0, logAggregationContext, null);
Token containerToken = Token containerToken =
BuilderUtils BuilderUtils
.newContainerToken(nodeId, containerTokenSecretManager .newContainerToken(nodeId, containerTokenSecretManager

View File

@ -80,4 +80,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
List<ResourceRequest> getResourceRequests(); List<ResourceRequest> getResourceRequests();
String getNodeHttpAddress(); String getNodeHttpAddress();
String getNodeLabelExpression();
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@ -153,6 +154,7 @@ public class RMContainerImpl implements RMContainer {
private final EventHandler eventHandler; private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer; private final ContainerAllocationExpirer containerAllocationExpirer;
private final String user; private final String user;
private final String nodeLabelExpression;
private Resource reservedResource; private Resource reservedResource;
private NodeId reservedNode; private NodeId reservedNode;
@ -167,12 +169,19 @@ public class RMContainerImpl implements RMContainer {
ApplicationAttemptId appAttemptId, NodeId nodeId, String user, ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) { RMContext rmContext) {
this(container, appAttemptId, nodeId, user, rmContext, System this(container, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis()); .currentTimeMillis(), "");
} }
public RMContainerImpl(Container container, public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
String user, RMContext rmContext, long creationTime) { RMContext rmContext, String nodeLabelExpression) {
this(container, appAttemptId, nodeId, user, rmContext, System
.currentTimeMillis(), nodeLabelExpression);
}
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext, long creationTime, String nodeLabelExpression) {
this.stateMachine = stateMachineFactory.make(this); this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId(); this.containerId = container.getId();
this.nodeId = nodeId; this.nodeId = nodeId;
@ -185,6 +194,7 @@ public class RMContainerImpl implements RMContainer {
this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
this.isAMContainer = false; this.isAMContainer = false;
this.resourceRequests = null; this.resourceRequests = null;
this.nodeLabelExpression = nodeLabelExpression;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock(); this.readLock = lock.readLock();
@ -597,4 +607,12 @@ public class RMContainerImpl implements RMContainer {
readLock.unlock(); readLock.unlock();
} }
} }
@Override
public String getNodeLabelExpression() {
if (nodeLabelExpression == null) {
return RMNodeLabelsManager.NO_LABEL;
}
return nodeLabelExpression;
}
} }

View File

@ -408,7 +408,7 @@ public abstract class AbstractYarnScheduler
RMContainer rmContainer = RMContainer rmContainer =
new RMContainerImpl(container, attemptId, node.getNodeID(), new RMContainerImpl(container, attemptId, node.getNodeID(),
applications.get(attemptId.getApplicationId()).getUser(), rmContext, applications.get(attemptId.getApplicationId()).getUser(), rmContext,
status.getCreationTime()); status.getCreationTime(), status.getNodeLabelExpression());
return rmContainer; return rmContainer;
} }

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset; import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset; import com.google.common.collect.Multiset;
@ -468,7 +469,8 @@ public class SchedulerApplicationAttempt {
container.setContainerToken(rmContext.getContainerTokenSecretManager() container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(), .createContainerToken(container.getId(), container.getNodeId(),
getUser(), container.getResource(), container.getPriority(), getUser(), container.getResource(), container.getPriority(),
rmContainer.getCreationTime(), this.logAggregationContext)); rmContainer.getCreationTime(), this.logAggregationContext,
rmContainer.getNodeLabelExpression()));
NMToken nmToken = NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container); getApplicationAttemptId(), container);
@ -703,4 +705,9 @@ public class SchedulerApplicationAttempt {
this.attemptResourceUsage, nodePartition, cluster, this.attemptResourceUsage, nodePartition, cluster,
schedulingMode); schedulingMode);
} }
@VisibleForTesting
public ResourceUsage getAppAttemptResourceUsage() {
return this.attemptResourceUsage;
}
} }

View File

@ -146,9 +146,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
} }
// Create RMContainer // Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this RMContainer rmContainer =
.getApplicationAttemptId(), node.getNodeID(), new RMContainerImpl(container, this.getApplicationAttemptId(),
appSchedulingInfo.getUser(), this.rmContext); node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
// Add it to allContainers list. // Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer); newlyAllocatedContainers.add(rmContainer);

View File

@ -179,7 +179,7 @@ public class RMContainerTokenSecretManager extends
String appSubmitter, Resource capability, Priority priority, String appSubmitter, Resource capability, Priority priority,
long createTime) { long createTime) {
return createContainerToken(containerId, nodeId, appSubmitter, capability, return createContainerToken(containerId, nodeId, appSubmitter, capability,
priority, createTime, null); priority, createTime, null, null);
} }
/** /**
@ -196,7 +196,8 @@ public class RMContainerTokenSecretManager extends
*/ */
public Token createContainerToken(ContainerId containerId, NodeId nodeId, public Token createContainerToken(ContainerId containerId, NodeId nodeId,
String appSubmitter, Resource capability, Priority priority, String appSubmitter, Resource capability, Priority priority,
long createTime, LogAggregationContext logAggregationContext) { long createTime, LogAggregationContext logAggregationContext,
String nodeLabelExpression) {
byte[] password; byte[] password;
ContainerTokenIdentifier tokenIdentifier; ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp = long expiryTimeStamp =
@ -210,7 +211,7 @@ public class RMContainerTokenSecretManager extends
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
.getMasterKey().getKeyId(), .getMasterKey().getKeyId(),
ResourceManager.getClusterTimeStamp(), priority, createTime, ResourceManager.getClusterTimeStamp(), priority, createTime,
logAggregationContext); logAggregationContext, nodeLabelExpression);
password = this.createPassword(tokenIdentifier); password = this.createPassword(tokenIdentifier);
} finally { } finally {

View File

@ -1987,11 +1987,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
public static NMContainerStatus createNMContainerStatus( public static NMContainerStatus createNMContainerStatus(
ApplicationAttemptId appAttemptId, int id, ContainerState containerState) { ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
return createNMContainerStatus(appAttemptId, id, containerState,
RMNodeLabelsManager.NO_LABEL);
}
public static NMContainerStatus createNMContainerStatus(
ApplicationAttemptId appAttemptId, int id, ContainerState containerState,
String nodeLabelExpression) {
ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
NMContainerStatus containerReport = NMContainerStatus containerReport =
NMContainerStatus.newInstance(containerId, containerState, NMContainerStatus.newInstance(containerId, containerState,
Resource.newInstance(1024, 1), "recover container", 0, Resource.newInstance(1024, 1), "recover container", 0,
Priority.newInstance(0), 0); Priority.newInstance(0), 0, nodeLabelExpression);
return containerReport; return containerReport;
} }

View File

@ -293,10 +293,11 @@ public class TestContainerAllocation {
public Token createContainerToken(ContainerId containerId, public Token createContainerToken(ContainerId containerId,
NodeId nodeId, String appSubmitter, Resource capability, NodeId nodeId, String appSubmitter, Resource capability,
Priority priority, long createTime, Priority priority, long createTime,
LogAggregationContext logAggregationContext) { LogAggregationContext logAggregationContext, String nodeLabelExp) {
numRetries++; numRetries++;
return super.createContainerToken(containerId, nodeId, appSubmitter, return super.createContainerToken(containerId, nodeId, appSubmitter,
capability, priority, createTime, logAggregationContext); capability, priority, createTime, logAggregationContext,
nodeLabelExp);
} }
}; };
} }

View File

@ -0,0 +1,282 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestWorkPreservingRMRestartForNodeLabel {
private Configuration conf;
private static final int GB = 1024; // 1024 MB
RMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
@SuppressWarnings("unchecked")
private <E> Set<E> toSet(E... elements) {
Set<E> set = Sets.newHashSet(elements);
return set;
}
private void checkRMContainerLabelExpression(ContainerId containerId,
MockRM rm, String labelExpression) {
RMContainer container =
rm.getRMContext().getScheduler().getRMContainer(containerId);
Assert.assertNotNull("Cannot find RMContainer=" + containerId, container);
Assert.assertEquals(labelExpression,
container.getNodeLabelExpression());
}
@SuppressWarnings("rawtypes")
public static void waitForNumContainersToRecover(int num, MockRM rm,
ApplicationAttemptId attemptId) throws Exception {
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
SchedulerApplicationAttempt attempt =
scheduler.getApplicationAttempt(attemptId);
while (attempt == null) {
System.out.println("Wait for scheduler attempt " + attemptId
+ " to be created");
Thread.sleep(200);
attempt = scheduler.getApplicationAttempt(attemptId);
}
while (attempt.getLiveContainers().size() < num) {
System.out.println("Wait for " + num
+ " containers to recover. currently: "
+ attempt.getLiveContainers().size());
Thread.sleep(200);
}
}
private void checkAppResourceUsage(String partition, ApplicationId appId,
MockRM rm, int expectedMemUsage) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
FiCaSchedulerApp app =
cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
Assert.assertEquals(expectedMemUsage, app.getAppAttemptResourceUsage()
.getUsed(partition).getMemory());
}
private void checkQueueResourceUsage(String partition, String queueName, MockRM rm, int expectedMemUsage) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = cs.getQueue(queueName);
Assert.assertEquals(expectedMemUsage, queue.getQueueResourceUsage()
.getUsed(partition).getMemory());
}
@Test
public void testWorkPreservingRestartForNodeLabel() throws Exception {
// This test is pretty much similar to testContainerAllocateWithLabel.
// Difference is, this test doesn't specify label expression in ResourceRequest,
// instead, it uses default queue label expression
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
NodeId.newInstance("h2", 0), toSet("y")));
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
conf = TestUtils.getConfigurationWithDefaultQueueLabels(conf);
// inject node label manager
MockRM rm1 =
new MockRM(conf,
memStore) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container.
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm1.waitForState(nm1, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
checkRMContainerLabelExpression(ContainerId.newContainerId(
am1.getApplicationAttemptId(), 1), rm1, "x");
checkRMContainerLabelExpression(ContainerId.newContainerId(
am1.getApplicationAttemptId(), 2), rm1, "x");
// launch an app to queue b1 (label = y), and check all container will
// be allocated in h2
RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// request a container.
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
Assert.assertTrue(rm1.waitForState(nm2, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
checkRMContainerLabelExpression(ContainerId.newContainerId(
am2.getApplicationAttemptId(), 1), rm1, "y");
checkRMContainerLabelExpression(ContainerId.newContainerId(
am2.getApplicationAttemptId(), 2), rm1, "y");
// launch an app to queue c1 (label = ""), and check all container will
// be allocated in h3
RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
// request a container.
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
Assert.assertTrue(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
checkRMContainerLabelExpression(ContainerId.newContainerId(
am3.getApplicationAttemptId(), 1), rm1, "");
checkRMContainerLabelExpression(ContainerId.newContainerId(
am3.getApplicationAttemptId(), 2), rm1, "");
// Re-start RM
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
NodeId.newInstance("h2", 0), toSet("y")));
MockRM rm2 =
new MockRM(conf,
memStore) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm2.setResourceTrackerService(rm2.getResourceTrackerService());
nm3.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
NMContainerStatus app1c1 =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
ContainerState.RUNNING, "x");
NMContainerStatus app1c2 =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING, "x");
nm1.registerNode(Arrays.asList(app1c1, app1c2), null);
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
checkRMContainerLabelExpression(ContainerId.newContainerId(
am1.getApplicationAttemptId(), 1), rm1, "x");
checkRMContainerLabelExpression(ContainerId.newContainerId(
am1.getApplicationAttemptId(), 2), rm1, "x");
NMContainerStatus app2c1 =
TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 1,
ContainerState.RUNNING, "y");
NMContainerStatus app2c2 =
TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 2,
ContainerState.RUNNING, "y");
nm2.registerNode(Arrays.asList(app2c1, app2c2), null);
waitForNumContainersToRecover(2, rm2, am2.getApplicationAttemptId());
checkRMContainerLabelExpression(ContainerId.newContainerId(
am2.getApplicationAttemptId(), 1), rm1, "y");
checkRMContainerLabelExpression(ContainerId.newContainerId(
am2.getApplicationAttemptId(), 2), rm1, "y");
NMContainerStatus app3c1 =
TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 1,
ContainerState.RUNNING, "");
NMContainerStatus app3c2 =
TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 2,
ContainerState.RUNNING, "");
nm3.registerNode(Arrays.asList(app3c1, app3c2), null);
waitForNumContainersToRecover(2, rm2, am3.getApplicationAttemptId());
checkRMContainerLabelExpression(ContainerId.newContainerId(
am3.getApplicationAttemptId(), 1), rm1, "");
checkRMContainerLabelExpression(ContainerId.newContainerId(
am3.getApplicationAttemptId(), 2), rm1, "");
// Check recovered resource usage
checkAppResourceUsage("x", app1.getApplicationId(), rm1, 2 * GB);
checkAppResourceUsage("y", app2.getApplicationId(), rm1, 2 * GB);
checkAppResourceUsage("", app3.getApplicationId(), rm1, 2 * GB);
checkQueueResourceUsage("x", "a1", rm1, 2 * GB);
checkQueueResourceUsage("y", "b1", rm1, 2 * GB);
checkQueueResourceUsage("", "c1", rm1, 2 * GB);
checkQueueResourceUsage("x", "a", rm1, 2 * GB);
checkQueueResourceUsage("y", "b", rm1, 2 * GB);
checkQueueResourceUsage("", "c", rm1, 2 * GB);
checkQueueResourceUsage("x", "root", rm1, 2 * GB);
checkQueueResourceUsage("y", "root", rm1, 2 * GB);
checkQueueResourceUsage("", "root", rm1, 2 * GB);
rm1.close();
rm2.close();
}
}