YARN-7522. Introduce AllocationTagsManager to associate allocation tags to nodes. (Wangda Tan via asuresh)

This commit is contained in:
Arun Suresh 2017-12-08 00:24:00 -08:00
parent 69de9a1ba9
commit 801c0988b5
13 changed files with 1033 additions and 0 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@ -107,6 +108,7 @@ public class RMActiveServiceContext {
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
private QueueLimitCalculator queueLimitCalculator;
private AllocationTagsManager allocationTagsManager;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@ -396,6 +398,19 @@ public class RMActiveServiceContext {
nodeLabelManager = mgr;
}
@Private
@Unstable
public AllocationTagsManager getAllocationTagsManager() {
return allocationTagsManager;
}
@Private
@Unstable
public void setAllocationTagsManager(
AllocationTagsManager allocationTagsManager) {
this.allocationTagsManager = allocationTagsManager;
}
@Private
@Unstable
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@ -166,4 +167,8 @@ public interface RMContext extends ApplicationMasterServiceContext {
void setResourceProfilesManager(ResourceProfilesManager mgr);
String getAppProxyUrl(Configuration conf, ApplicationId applicationId);
AllocationTagsManager getAllocationTagsManager();
void setAllocationTagsManager(AllocationTagsManager allocationTagsManager);
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@ -503,6 +504,17 @@ public class RMContextImpl implements RMContext {
activeServiceContext.setNodeLabelManager(mgr);
}
@Override
public AllocationTagsManager getAllocationTagsManager() {
return activeServiceContext.getAllocationTagsManager();
}
@Override
public void setAllocationTagsManager(
AllocationTagsManager allocationTagsManager) {
activeServiceContext.setAllocationTagsManager(allocationTagsManager);
}
@Override
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return activeServiceContext.getRMDelegatedNodeLabelsUpdater();

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Pu
import org.apache.hadoop.yarn.server.resourcemanager.metrics.CombinedSystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -493,6 +494,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
throws InstantiationException, IllegalAccessException {
return new RMNodeLabelsManager();
}
protected AllocationTagsManager createAllocationTagsManager() {
return new AllocationTagsManager();
}
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer();
@ -619,6 +624,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(nlm);
rmContext.setNodeLabelManager(nlm);
AllocationTagsManager allocationTagsManager =
createAllocationTagsManager();
rmContext.setAllocationTagsManager(allocationTagsManager);
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {

View File

@ -0,0 +1,431 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.resourcemanager.constraint;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.log4j.Logger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongBinaryOperator;
/**
* Support storing maps between container-tags/applications and
* nodes. This will be required by affinity/anti-affinity implementation and
* cardinality.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class AllocationTagsManager {
private static final Logger LOG = Logger.getLogger(
AllocationTagsManager.class);
private ReentrantReadWriteLock.ReadLock readLock;
private ReentrantReadWriteLock.WriteLock writeLock;
// Application's tags to node
private Map<ApplicationId, NodeToCountedTags> perAppMappings =
new HashMap<>();
// Global tags to node mapping (used to fast return aggregated tags
// cardinality across apps)
private NodeToCountedTags globalMapping = new NodeToCountedTags();
/**
* Store node to counted tags.
*/
@VisibleForTesting
static class NodeToCountedTags {
// Map<NodeId, Map<Tag, Count>>
private Map<NodeId, Map<String, Long>> nodeToTagsWithCount =
new HashMap<>();
// protected by external locks
private void addTagsToNode(NodeId nodeId, Set<String> tags) {
Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
k -> new HashMap<>());
for (String tag : tags) {
Long count = innerMap.get(tag);
if (count == null) {
innerMap.put(tag, 1L);
} else{
innerMap.put(tag, count + 1);
}
}
}
// protected by external locks
private void addTagToNode(NodeId nodeId, String tag) {
Map<String, Long> innerMap = nodeToTagsWithCount.computeIfAbsent(nodeId,
k -> new HashMap<>());
Long count = innerMap.get(tag);
if (count == null) {
innerMap.put(tag, 1L);
} else{
innerMap.put(tag, count + 1);
}
}
private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
Long count = innerMap.get(tag);
if (count > 1) {
innerMap.put(tag, count - 1);
} else {
if (count <= 0) {
LOG.warn(
"Trying to remove tags from node, however the count already"
+ " becomes 0 or less, it could be a potential bug.");
}
innerMap.remove(tag);
}
}
private void removeTagsFromNode(NodeId nodeId, Set<String> tags) {
Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
if (innerMap == null) {
LOG.warn("Failed to find node=" + nodeId
+ " while trying to remove tags, please double check.");
return;
}
for (String tag : tags) {
removeTagFromInnerMap(innerMap, tag);
}
if (innerMap.isEmpty()) {
nodeToTagsWithCount.remove(nodeId);
}
}
private void removeTagFromNode(NodeId nodeId, String tag) {
Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
if (innerMap == null) {
LOG.warn("Failed to find node=" + nodeId
+ " while trying to remove tags, please double check.");
return;
}
removeTagFromInnerMap(innerMap, tag);
if (innerMap.isEmpty()) {
nodeToTagsWithCount.remove(nodeId);
}
}
private long getCardinality(NodeId nodeId, String tag) {
Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
if (innerMap == null) {
return 0;
}
Long value = innerMap.get(tag);
return value == null ? 0 : value;
}
private long getCardinality(NodeId nodeId, Set<String> tags,
LongBinaryOperator op) {
Map<String, Long> innerMap = nodeToTagsWithCount.get(nodeId);
if (innerMap == null) {
return 0;
}
long returnValue = 0;
boolean firstTag = true;
if (tags != null && !tags.isEmpty()) {
for (String tag : tags) {
Long value = innerMap.get(tag);
if (value == null) {
value = 0L;
}
if (firstTag) {
returnValue = value;
firstTag = false;
continue;
}
returnValue = op.applyAsLong(returnValue, value);
}
} else {
// Similar to above if, but only iterate values for better performance
for (long value : innerMap.values()) {
// For the first value, we will not apply op
if (firstTag) {
returnValue = value;
firstTag = false;
continue;
}
returnValue = op.applyAsLong(returnValue, value);
}
}
return returnValue;
}
private boolean isEmpty() {
return nodeToTagsWithCount.isEmpty();
}
@VisibleForTesting
public Map<NodeId, Map<String, Long>> getNodeToTagsWithCount() {
return nodeToTagsWithCount;
}
}
@VisibleForTesting
Map<ApplicationId, NodeToCountedTags> getPerAppMappings() {
return perAppMappings;
}
@VisibleForTesting
NodeToCountedTags getGlobalMapping() {
return globalMapping;
}
public AllocationTagsManager() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
/**
* Notify container allocated on a node.
*
* @param nodeId allocated node.
* @param applicationId applicationId
* @param containerId container id.
* @param allocationTags allocation tags, see
* {@link SchedulingRequest#getAllocationTags()}
* application_id will be added to allocationTags.
*/
public void addContainer(NodeId nodeId, ApplicationId applicationId,
ContainerId containerId, Set<String> allocationTags) {
String applicationIdTag =
AllocationTagsNamespaces.APP_ID + applicationId.toString();
boolean useSet = false;
if (allocationTags != null && !allocationTags.isEmpty()) {
// Copy before edit it.
allocationTags = new HashSet<>(allocationTags);
allocationTags.add(applicationIdTag);
useSet = true;
}
writeLock.lock();
try {
NodeToCountedTags perAppTagsMapping = perAppMappings.computeIfAbsent(
applicationId, k -> new NodeToCountedTags());
if (useSet) {
perAppTagsMapping.addTagsToNode(nodeId, allocationTags);
globalMapping.addTagsToNode(nodeId, allocationTags);
} else {
perAppTagsMapping.addTagToNode(nodeId, applicationIdTag);
globalMapping.addTagToNode(nodeId, applicationIdTag);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Added container=" + containerId + " with tags=[" + StringUtils
.join(allocationTags, ",") + "]");
}
} finally {
writeLock.unlock();
}
}
/**
* Notify container removed.
*
* @param nodeId nodeId
* @param applicationId applicationId
* @param containerId containerId.
* @param allocationTags allocation tags for given container
*/
public void removeContainer(NodeId nodeId, ApplicationId applicationId,
ContainerId containerId, Set<String> allocationTags) {
String applicationIdTag =
AllocationTagsNamespaces.APP_ID + applicationId.toString();
boolean useSet = false;
if (allocationTags != null && !allocationTags.isEmpty()) {
// Copy before edit it.
allocationTags = new HashSet<>(allocationTags);
allocationTags.add(applicationIdTag);
useSet = true;
}
writeLock.lock();
try {
NodeToCountedTags perAppTagsMapping = perAppMappings.get(applicationId);
if (perAppTagsMapping == null) {
return;
}
if (useSet) {
perAppTagsMapping.removeTagsFromNode(nodeId, allocationTags);
globalMapping.removeTagsFromNode(nodeId, allocationTags);
} else {
perAppTagsMapping.removeTagFromNode(nodeId, applicationIdTag);
globalMapping.removeTagFromNode(nodeId, applicationIdTag);
}
if (perAppTagsMapping.isEmpty()) {
perAppMappings.remove(applicationId);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Removed container=" + containerId + " with tags=[" + StringUtils
.join(allocationTags, ",") + "]");
}
} finally {
writeLock.unlock();
}
}
/**
* Get cardinality for following conditions. External can pass-in a binary op
* to implement customized logic. *
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all nodes.
* @param tag allocation tag, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
* depends on op. If a specified tag doesn't exist,
* 0 will be its cardinality.
* When null/empty tags specified, all tags
* (of the node/app) will be considered.
* @return cardinality of specified query on the node.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
*/
public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
String tag) throws InvalidAllocationTagsQueryException {
readLock.lock();
try {
if (nodeId == null) {
throw new InvalidAllocationTagsQueryException(
"Must specify nodeId/tags/op to query cardinality");
}
NodeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppMappings.get(applicationId);
} else{
mapping = globalMapping;
}
if (mapping == null) {
return 0;
}
return mapping.getCardinality(nodeId, tag);
} finally {
readLock.unlock();
}
}
/**
* Check if given tag exists on node.
*
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all nodes.
* @param tag allocation tag, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
* depends on op. If a specified tag doesn't exist,
* 0 will be its cardinality.
* When null/empty tags specified, all tags
* (of the node/app) will be considered.
* @return cardinality of specified query on the node.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
*/
public boolean allocationTagExistsOnNode(NodeId nodeId,
ApplicationId applicationId, String tag)
throws InvalidAllocationTagsQueryException {
return getNodeCardinality(nodeId, applicationId, tag) > 0;
}
/**
* Get cardinality for following conditions. External can pass-in a binary op
* to implement customized logic.
*
* @param nodeId nodeId, required.
* @param applicationId applicationId. When null is specified, return
* aggregated cardinality among all nodes.
* @param tags allocation tags, see
* {@link SchedulingRequest#getAllocationTags()},
* When multiple tags specified. Returns cardinality
* depends on op. If a specified tag doesn't exist, 0
* will be its cardinality. When null/empty tags
* specified, all tags (of the node/app) will be
* considered.
* @param op operator. Such as Long::max, Long::sum, etc. Required.
* This sparameter only take effect when #values >= 2.
* @return cardinality of specified query on the node.
* @throws InvalidAllocationTagsQueryException when illegal query
* parameter specified
*/
public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
Set<String> tags, LongBinaryOperator op)
throws InvalidAllocationTagsQueryException {
readLock.lock();
try {
if (nodeId == null || op == null) {
throw new InvalidAllocationTagsQueryException(
"Must specify nodeId/tags/op to query cardinality");
}
NodeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppMappings.get(applicationId);
} else{
mapping = globalMapping;
}
if (mapping == null) {
return 0;
}
return mapping.getCardinality(nodeId, tags, op);
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,31 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.resourcemanager.constraint;
/**
* Predefined namespaces for tags
*
* Same as namespace of resource types. Namespaces of placement tags are start
* with alphabets and ended with "/"
*/
public class AllocationTagsNamespaces {
public static final String APP_ID = "yarn_app_id/";
}

View File

@ -0,0 +1,35 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.resourcemanager.constraint;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* Exception when invalid parameter specified to do placement tags related
* queries.
*/
public class InvalidAllocationTagsQueryException extends YarnException {
private static final long serialVersionUID = 12312831974894L;
public InvalidAllocationTagsQueryException(String msg) {
super(msg);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -115,4 +117,10 @@ public interface RMContainer extends EventHandler<RMContainerEvent>,
boolean completed();
NodeId getNodeId();
/**
* Return {@link SchedulingRequest#getAllocationTags()} specified by AM.
* @return allocation tags, could be null/empty
*/
Set<String> getAllocationTags();
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -189,6 +190,9 @@ public class RMContainerImpl implements RMContainer {
private boolean isExternallyAllocated;
private SchedulerRequestKey allocatedSchedulerKey;
// TODO, set it when container allocated by scheduler (From SchedulingRequest)
private Set<String> allocationTags = null;
public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
@ -501,6 +505,11 @@ public class RMContainerImpl implements RMContainer {
return nodeId;
}
@Override
public Set<String> getAllocationTags() {
return allocationTags;
}
private static class BaseTransition implements
SingleArcTransition<RMContainerImpl, RMContainerEvent> {
@ -565,6 +574,12 @@ public class RMContainerImpl implements RMContainer {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Notify placementManager
container.rmContext.getAllocationTagsManager().addContainer(
container.getNodeId(),
container.getApplicationAttemptId().getApplicationId(),
container.getContainerId(), container.getAllocationTags());
container.eventHandler.handle(new RMAppAttemptEvent(
container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
}
@ -676,6 +691,12 @@ public class RMContainerImpl implements RMContainer {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Notify placementManager
container.rmContext.getAllocationTagsManager().removeContainer(
container.getNodeId(),
container.getApplicationAttemptId().getApplicationId(),
container.getContainerId(), container.getAllocationTags());
RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
container.finishTime = System.currentTimeMillis();

View File

@ -0,0 +1,328 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.resourcemanager.constraint;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.junit.Assert;
import org.junit.Test;
/**
* Test functionality of AllocationTagsManager.
*/
public class TestAllocationTagsManager {
@Test
public void testAllocationTagsManagerSimpleCases()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager atm = new AllocationTagsManager();
/**
* Construct test case:
* Node1:
* container_1_1 (mapper/reducer/app_1)
* container_1_3 (service/app_1)
*
* Node2:
* container_1_2 (mapper/reducer/app_1)
* container_1_4 (reducer/app_1)
* container_2_1 (service/app_2)
*/
// 3 Containers from app1
atm.addContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
// 1 Container from app2
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Get Cardinality of app1 on node1, with tag "mapper"
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::max));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::min));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::max));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
Assert.assertEquals(3,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::sum));
// Get Cardinality by passing single tag.
Assert.assertEquals(1,
atm.getNodeCardinality(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), "mapper"));
Assert.assertEquals(2,
atm.getNodeCardinality(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), "reducer"));
// Get Cardinality of app1 on node2, with tag "no_existed/reducer", op=min
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("no_existed", "reducer"), Long::min));
// Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
// (Expect this returns #containers from app1 on node2)
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), ImmutableSet
.of(AllocationTagsNamespaces.APP_ID + TestUtils
.getMockApplicationId(1).toString()), Long::max));
// Get Cardinality of app1 on node2, with empty tag set, op=max
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
// Get Cardinality of all apps on node2, with empty tag set, op=sum
Assert.assertEquals(7,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
ImmutableSet.of(), Long::sum));
// Get Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(5,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
// Get Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(2,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
// Finish all containers:
atm.removeContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.removeContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
atm.removeContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Expect all cardinality to be 0
// Get Cardinality of app1 on node1, with tag "mapper"
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::max));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=min
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::min));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=max
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::max));
// Get Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"), Long::sum));
// Get Cardinality of app1 on node2, with tag "<applicationId>", op=max
// (Expect this returns #containers from app1 on node2)
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()),
Long::max));
Assert.assertEquals(0,
atm.getNodeCardinality(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1),
TestUtils.getMockApplicationId(1).toString()));
// Get Cardinality of app1 on node2, with empty tag set, op=max
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
// Get Cardinality of all apps on node2, with empty tag set, op=sum
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), null,
ImmutableSet.of(), Long::sum));
// Get Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
// Get Cardinality of app_1 on node2, with empty tag set, op=sum
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
}
@Test
public void testAllocationTagsManagerMemoryAfterCleanup()
throws InvalidAllocationTagsQueryException {
/**
* Make sure YARN cleans up all memory once container/app finishes.
*/
AllocationTagsManager atm = new AllocationTagsManager();
// Add a bunch of containers
atm.addContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Remove all these containers
atm.removeContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.removeContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
atm.removeContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Check internal data structure
Assert.assertEquals(0,
atm.getGlobalMapping().getNodeToTagsWithCount().size());
Assert.assertEquals(0, atm.getPerAppMappings().size());
}
@Test
public void testQueryCardinalityWithIllegalParameters()
throws InvalidAllocationTagsQueryException {
/**
* Make sure YARN cleans up all memory once container/app finishes.
*/
AllocationTagsManager atm = new AllocationTagsManager();
// Add a bunch of containers
atm.addContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("node1:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// No node-id
boolean caughtException = false;
try {
atm.getNodeCardinalityByOp(null, TestUtils.getMockApplicationId(2),
ImmutableSet.of("mapper"), Long::min);
} catch (InvalidAllocationTagsQueryException e) {
caughtException = true;
}
Assert.assertTrue("should fail because of nodeId specified",
caughtException);
// No op
caughtException = false;
try {
atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null);
} catch (InvalidAllocationTagsQueryException e) {
caughtException = true;
}
Assert.assertTrue("should fail because of nodeId specified",
caughtException);
}
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -109,6 +110,8 @@ public class TestRMContainerImpl {
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getRMApps()).thenReturn(rmApps);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
AllocationTagsManager ptm = mock(AllocationTagsManager.class);
when(rmContext.getAllocationTagsManager()).thenReturn(ptm);
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
@ -209,6 +212,8 @@ public class TestRMContainerImpl {
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
AllocationTagsManager ptm = mock(AllocationTagsManager.class);
when(rmContext.getAllocationTagsManager()).thenReturn(ptm);
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
@ -367,4 +372,123 @@ public class TestRMContainerImpl {
verify(publisher, times(1)).containerCreated(any(RMContainer.class), anyLong());
verify(publisher, times(1)).containerFinished(any(RMContainer.class), anyLong());
}
@Test
public void testContainerTransitionNotifyPlacementTagsManager()
throws Exception {
DrainDispatcher drainDispatcher = new DrainDispatcher();
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(
EventHandler.class);
EventHandler generic = mock(EventHandler.class);
drainDispatcher.register(RMAppAttemptEventType.class,
appAttemptEventHandler);
drainDispatcher.register(RMNodeEventType.class, generic);
drainDispatcher.init(new YarnConfiguration());
drainDispatcher.start();
NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
Resource resource = BuilderUtils.newResource(512, 1);
Priority priority = BuilderUtils.newPriority(5);
Container container = BuilderUtils.newContainer(containerId, nodeId,
"host:3465", resource, priority, null);
ConcurrentMap<ApplicationId, RMApp> rmApps =
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
when(rmApp.getRMAppAttempt(Matchers.any())).thenReturn(null);
Mockito.doReturn(rmApp).when(rmApps).get(Matchers.any());
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
AllocationTagsManager tagsManager = new AllocationTagsManager();
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
when(rmContext.getRMApps()).thenReturn(rmApps);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getAllocationTagsManager()).thenReturn(tagsManager);
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
true);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
/* First container: ALLOCATED -> KILLED */
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
Assert.assertEquals(1,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.KILL));
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
/* Second container: ACQUIRED -> FINISHED */
rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
Assert.assertEquals(1,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED));
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
/* Third container: RUNNING -> FINISHED */
rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
Assert.assertEquals(1,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED));
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
}
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -135,6 +136,9 @@ public class TestUtils {
new DefaultResourceCalculator());
rmContext.setScheduler(mockScheduler);
AllocationTagsManager ptm = mock(AllocationTagsManager.class);
rmContext.setAllocationTagsManager(ptm);
return rmContext;
}
@ -234,6 +238,11 @@ public class TestUtils {
doReturn(id).when(containerId).getContainerId();
return containerId;
}
public static ContainerId getMockContainerId(int appId, int containerId) {
ApplicationAttemptId attemptId = getMockApplicationAttemptId(appId, 1);
return ContainerId.newContainerId(attemptId, containerId);
}
public static Container getMockContainer(
ContainerId containerId, NodeId nodeId,

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWri
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -234,6 +235,8 @@ public class TestFifoScheduler {
FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
AllocationTagsManager ptm = mock(AllocationTagsManager.class);
rmContext.setAllocationTagsManager(ptm);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(
mock(RMApplicationHistoryWriter.class));
@ -312,12 +315,14 @@ public class TestFifoScheduler {
FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
AllocationTagsManager ptm = mock(AllocationTagsManager.class);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));
((RMContextImpl) rmContext).setYarnConfiguration(new YarnConfiguration());
NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
nlm.init(new Configuration());
rmContext.setNodeLabelManager(nlm);
rmContext.setAllocationTagsManager(ptm);
scheduler.setRMContext(rmContext);
((RMContextImpl) rmContext).setScheduler(scheduler);