YARN-7613. Implement Basic algorithm for constraint based placement. (Panagiotis Garefalakis via asuresh)

This commit is contained in:
Arun Suresh 2017-12-27 22:59:22 -08:00
parent f9af15d659
commit a52d11fb8c
15 changed files with 721 additions and 239 deletions

View File

@ -536,6 +536,10 @@ public class YarnConfiguration extends Configuration {
public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS =
RM_PREFIX + "placement-constraints.algorithm.class";
/** Used for BasicPlacementAlgorithm - default SERIAL. **/
public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR =
RM_PREFIX + "placement-constraints.algorithm.iterator";
public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
RM_PREFIX + "placement-constraints.enabled";

View File

@ -145,7 +145,13 @@
<property>
<description>Constraint Placement Algorithm to be used.</description>
<name>yarn.resourcemanager.placement-constraints.algorithm.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SamplePlacementAlgorithm</value>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm</value>
</property>
<property>
<description>Placement Algorithm Requests Iterator to be used.</description>
<name>yarn.resourcemanager.placement-constraints.algorithm.iterator</name>
<value>SERIAL</value>
</property>
<property>

View File

@ -579,9 +579,8 @@ public class RMContainerImpl implements RMContainer {
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Notify placementManager
container.rmContext.getAllocationTagsManager().addContainer(
container.getNodeId(),
container.getApplicationAttemptId().getApplicationId(),
container.getContainerId(), container.getAllocationTags());
container.getNodeId(), container.getContainerId(),
container.getAllocationTags());
container.eventHandler.handle(new RMAppAttemptEvent(
container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
@ -696,9 +695,8 @@ public class RMContainerImpl implements RMContainer {
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Notify placementManager
container.rmContext.getAllocationTagsManager().removeContainer(
container.getNodeId(),
container.getApplicationAttemptId().getApplicationId(),
container.getContainerId(), container.getAllocationTags());
container.getNodeId(), container.getContainerId(),
container.getAllocationTags());
RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;

View File

@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -54,24 +55,27 @@ public class AllocationTagsManager {
private final RMContext rmContext;
// Application's tags to Node
private Map<ApplicationId, NodeToCountedTags> perAppNodeMappings =
private Map<ApplicationId, TypeToCountedTags> perAppNodeMappings =
new HashMap<>();
// Application's tags to Rack
private Map<ApplicationId, NodeToCountedTags> perAppRackMappings =
private Map<ApplicationId, TypeToCountedTags> perAppRackMappings =
new HashMap<>();
// Application's Temporary containers mapping
private Map<ApplicationId, Map<NodeId, Map<ContainerId, Set<String>>>>
appTempMappings = new HashMap<>();
// Global tags to node mapping (used to fast return aggregated tags
// cardinality across apps)
private NodeToCountedTags<NodeId> globalNodeMapping = new NodeToCountedTags();
private TypeToCountedTags<NodeId> globalNodeMapping = new TypeToCountedTags();
// Global tags to Rack mapping
private NodeToCountedTags<String> globalRackMapping = new NodeToCountedTags();
private TypeToCountedTags<String> globalRackMapping = new TypeToCountedTags();
/**
* Generic store mapping type <T> to counted tags.
* Currently used both for NodeId to Tag, Count and Rack to Tag, Count
*/
@VisibleForTesting
static class NodeToCountedTags<T> {
static class TypeToCountedTags<T> {
// Map<Type, Map<Tag, Count>>
private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
@ -209,25 +213,31 @@ public class AllocationTagsManager {
}
@VisibleForTesting
Map<ApplicationId, NodeToCountedTags> getPerAppNodeMappings() {
Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
return perAppNodeMappings;
}
@VisibleForTesting
Map<ApplicationId, NodeToCountedTags> getPerAppRackMappings() {
Map<ApplicationId, TypeToCountedTags> getPerAppRackMappings() {
return perAppRackMappings;
}
@VisibleForTesting
NodeToCountedTags getGlobalNodeMapping() {
TypeToCountedTags getGlobalNodeMapping() {
return globalNodeMapping;
}
@VisibleForTesting
NodeToCountedTags getGlobalRackMapping() {
TypeToCountedTags getGlobalRackMapping() {
return globalRackMapping;
}
@VisibleForTesting
public Map<NodeId, Map<ContainerId, Set<String>>> getAppTempMappings(
ApplicationId applicationId) {
return appTempMappings.get(applicationId);
}
public AllocationTagsManager(RMContext context) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
@ -235,18 +245,52 @@ public class AllocationTagsManager {
rmContext = context;
}
//
/**
* Method adds a temporary fake-container tag to Node mapping.
* Used by the constrained placement algorithm to keep track of containers
* that are currently placed on nodes but are not yet allocated.
* @param nodeId
* @param applicationId
* @param allocationTags
*/
public void addTempContainer(NodeId nodeId, ApplicationId applicationId,
Set<String> allocationTags) {
ContainerId tmpContainer = ContainerId.newContainerId(
ApplicationAttemptId.newInstance(applicationId, 1), System.nanoTime());
writeLock.lock();
try {
Map<NodeId, Map<ContainerId, Set<String>>> appTempMapping =
appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
Map<ContainerId, Set<String>> containerTempMapping =
appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
containerTempMapping.put(tmpContainer, allocationTags);
if (LOG.isDebugEnabled()) {
LOG.debug("Added TEMP container=" + tmpContainer + " with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
} finally {
writeLock.unlock();
}
addContainer(nodeId, tmpContainer, allocationTags);
}
/**
* Notify container allocated on a node.
*
* @param nodeId allocated node.
* @param applicationId applicationId
* @param containerId container id.
* @param allocationTags allocation tags, see
* {@link SchedulingRequest#getAllocationTags()}
* application_id will be added to allocationTags.
*/
public void addContainer(NodeId nodeId, ApplicationId applicationId,
ContainerId containerId, Set<String> allocationTags) {
public void addContainer(NodeId nodeId, ContainerId containerId,
Set<String> allocationTags) {
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
String applicationIdTag =
AllocationTagsNamespaces.APP_ID + applicationId.toString();
@ -260,10 +304,10 @@ public class AllocationTagsManager {
writeLock.lock();
try {
NodeToCountedTags perAppTagsMapping = perAppNodeMappings
.computeIfAbsent(applicationId, k -> new NodeToCountedTags());
NodeToCountedTags perAppRackTagsMapping = perAppRackMappings
.computeIfAbsent(applicationId, k -> new NodeToCountedTags());
TypeToCountedTags perAppTagsMapping = perAppNodeMappings
.computeIfAbsent(applicationId, k -> new TypeToCountedTags());
TypeToCountedTags perAppRackTagsMapping = perAppRackMappings
.computeIfAbsent(applicationId, k -> new TypeToCountedTags());
// Covering test-cases where context is mocked
String nodeRack = (rmContext.getRMNodes() != null
&& rmContext.getRMNodes().get(nodeId) != null)
@ -294,12 +338,13 @@ public class AllocationTagsManager {
* Notify container removed.
*
* @param nodeId nodeId
* @param applicationId applicationId
* @param containerId containerId.
* @param allocationTags allocation tags for given container
*/
public void removeContainer(NodeId nodeId, ApplicationId applicationId,
public void removeContainer(NodeId nodeId,
ContainerId containerId, Set<String> allocationTags) {
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
String applicationIdTag =
AllocationTagsNamespaces.APP_ID + applicationId.toString();
boolean useSet = false;
@ -313,9 +358,9 @@ public class AllocationTagsManager {
writeLock.lock();
try {
NodeToCountedTags perAppTagsMapping =
TypeToCountedTags perAppTagsMapping =
perAppNodeMappings.get(applicationId);
NodeToCountedTags perAppRackTagsMapping =
TypeToCountedTags perAppRackTagsMapping =
perAppRackMappings.get(applicationId);
if (perAppTagsMapping == null) {
return;
@ -353,6 +398,34 @@ public class AllocationTagsManager {
}
}
/**
* Method removes temporary containers associated with an application
* Used by the placement algorithm to clean temporary tags at the end of
* a placement cycle.
* @param applicationId Application Id.
*/
public void cleanTempContainers(ApplicationId applicationId) {
if (!appTempMappings.get(applicationId).isEmpty()) {
appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
nodeE.getValue().entrySet().stream().forEach(containerE -> {
removeContainer(nodeE.getKey(), containerE.getKey(),
containerE.getValue());
});
});
writeLock.lock();
try {
appTempMappings.remove(applicationId);
if (LOG.isDebugEnabled()) {
LOG.debug("Removed TEMP containers of app=" + applicationId);
}
} finally {
writeLock.unlock();
}
}
}
/**
* Get Node cardinality for a specific tag.
* When applicationId is null, method returns aggregated cardinality
@ -378,7 +451,7 @@ public class AllocationTagsManager {
"Must specify nodeId/tag to query cardinality");
}
NodeToCountedTags mapping;
TypeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppNodeMappings.get(applicationId);
} else {
@ -419,7 +492,7 @@ public class AllocationTagsManager {
"Must specify rack/tag to query cardinality");
}
NodeToCountedTags mapping;
TypeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppRackMappings.get(applicationId);
} else {
@ -492,7 +565,7 @@ public class AllocationTagsManager {
"Must specify nodeId/tags/op to query cardinality");
}
NodeToCountedTags mapping;
TypeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppNodeMappings.get(applicationId);
} else {
@ -540,7 +613,7 @@ public class AllocationTagsManager {
"Must specify rack/tags/op to query cardinality");
}
NodeToCountedTags mapping;
TypeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppRackMappings.get(applicationId);
} else {

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Basic placement algorithm.
* Supports different Iterators at SchedulingRequest level including:
* Serial, PopularTags
*/
public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultPlacementAlgorithm.class);
private AllocationTagsManager tagsManager;
private PlacementConstraintManager constraintManager;
private NodeCandidateSelector nodeSelector;
@Override
public void init(RMContext rmContext) {
this.tagsManager = rmContext.getAllocationTagsManager();
this.constraintManager = rmContext.getPlacementConstraintManager();
this.nodeSelector =
filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
.getNodes(filter);
}
/**
* TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682)
* @param applicationId
* @param allocationTags
* @param nodeId
* @param tagsManager
* @return boolean
* @throws InvalidAllocationTagsQueryException
*/
public boolean canAssign(ApplicationId applicationId,
Set<String> allocationTags, NodeId nodeId,
AllocationTagsManager tagsManager)
throws InvalidAllocationTagsQueryException {
PlacementConstraint constraint =
constraintManager.getConstraint(applicationId, allocationTags);
if (constraint == null) {
return true;
}
// TODO: proper transformations
// Currently works only for simple anti-affinity
// NODE scope target expressions
PlacementConstraintTransformations.SpecializedConstraintTransformer transformer =
new PlacementConstraintTransformations.SpecializedConstraintTransformer(
constraint);
PlacementConstraint transform = transformer.transform();
PlacementConstraint.TargetConstraint targetConstraint =
(PlacementConstraint.TargetConstraint) transform.getConstraintExpr();
// Assume a single target expression tag;
// The Sample Algorithm assumes a constraint will always be a simple
// Target Constraint with a single entry in the target set.
// As mentioned in the class javadoc - This algorithm should be
// used mostly for testing and validating end-2-end workflow.
String targetTag = targetConstraint.getTargetExpressions().iterator().next()
.getTargetValues().iterator().next();
// TODO: Assuming anti-affinity constraint
long nodeCardinality =
tagsManager.getNodeCardinality(nodeId, applicationId, targetTag);
if (nodeCardinality != 0) {
return false;
}
// return true if it is a valid placement
return true;
}
public boolean attemptPlacementOnNode(ApplicationId appId,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
throws InvalidAllocationTagsQueryException {
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
if (numAllocs > 0) {
if (canAssign(appId,
schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(),
tagsManager)) {
return true;
}
}
return false;
}
@Override
public void place(ConstraintPlacementAlgorithmInput input,
ConstraintPlacementAlgorithmOutputCollector collector) {
BatchedRequests requests = (BatchedRequests) input;
ConstraintPlacementAlgorithmOutput resp =
new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
Iterator<SchedulingRequest> requestIterator = requests.iterator();
while (requestIterator.hasNext()) {
SchedulingRequest schedulingRequest = requestIterator.next();
Iterator<SchedulerNode> nodeIter = allNodes.iterator();
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
while (nodeIter.hasNext() && numAllocs > 0) {
SchedulerNode node = nodeIter.next();
try {
if (attemptPlacementOnNode(requests.getApplicationId(),
schedulingRequest, node)) {
schedulingRequest.getResourceSizing()
.setNumAllocations(--numAllocs);
PlacedSchedulingRequest placedReq =
new PlacedSchedulingRequest(schedulingRequest);
placedReq.setPlacementAttempt(requests.getPlacementAttempt());
placedReq.getNodes().add(node);
resp.getPlacedRequests().add(placedReq);
numAllocs =
schedulingRequest.getResourceSizing().getNumAllocations();
// Add temp-container tags for current placement cycle
this.tagsManager.addTempContainer(node.getNodeID(),
requests.getApplicationId(),
schedulingRequest.getAllocationTags());
}
} catch (InvalidAllocationTagsQueryException e) {
LOG.warn("Got exception from TagManager !", e);
}
}
}
// Add all requests whose numAllocations still > 0 to rejected list.
requests.getSchedulingRequests().stream()
.filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
.forEach(rejReq -> resp.getRejectedRequests().add(rejReq));
collector.collect(resp);
// Clean current temp-container tags
this.tagsManager.cleanTempContainers(requests.getApplicationId());
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
/**
* Traverse Scheduling requests with the most popular tags (count) first.
* Currently the count is per Batch but could use TagManager for global count.
*/
public class PopularTagsIterator implements Iterator<SchedulingRequest> {
private final List<SchedulingRequest> schedulingRequestList;
private int cursor;
public PopularTagsIterator(Collection<SchedulingRequest> schedulingRequests) {
this.schedulingRequestList = new ArrayList<>(schedulingRequests);
// Most popular First
Collections.sort(schedulingRequestList,
(o1, o2) -> (int) getTagPopularity(o2) - (int) getTagPopularity(o1));
this.cursor = 0;
}
private long getTagPopularity(SchedulingRequest o1) {
long max = 0;
for (String tag : o1.getAllocationTags()) {
long count = schedulingRequestList.stream()
.filter(req -> req.getAllocationTags().contains(tag)).count();
if (count > max) {
max = count;
}
}
return max;
}
@Override
public boolean hasNext() {
return (cursor < schedulingRequestList.size());
}
@Override
public SchedulingRequest next() {
if (hasNext()) {
return schedulingRequestList.get(cursor++);
}
throw new NoSuchElementException();
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
/**
* Traverse Scheduling Requests in the same order as they arrive
*/
public class SerialIterator implements Iterator<SchedulingRequest> {
private final List<SchedulingRequest> schedulingRequestList;
private int cursor;
public SerialIterator(Collection<SchedulingRequest> schedulingRequests) {
this.schedulingRequestList = new ArrayList<>(schedulingRequests);
this.cursor = 0;
}
@Override
public boolean hasNext() {
return (cursor < schedulingRequestList.size());
}
@Override
public SchedulingRequest next() {
if (hasNext()) {
return schedulingRequestList.get(cursor++);
}
throw new NoSuchElementException();
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
* contains classes related to scheduling containers using placement
* constraints.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
* contains classes related to scheduling containers using placement
* constraints.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -21,12 +21,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators.PopularTagsIterator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators.SerialIterator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@ -35,7 +38,8 @@ import java.util.Set;
* to place as a batch. The placement algorithm tends to give more optimal
* placements if more requests are batched together.
*/
class BatchedRequests implements ConstraintPlacementAlgorithmInput {
public class BatchedRequests
implements ConstraintPlacementAlgorithmInput, Iterable<SchedulingRequest> {
// PlacementAlgorithmOutput attempt - the number of times the requests in this
// batch has been placed but was rejected by the scheduler.
@ -44,19 +48,46 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
private final ApplicationId applicationId;
private final Collection<SchedulingRequest> requests;
private final Map<String, Set<NodeId>> blacklist = new HashMap<>();
private IteratorType iteratorType;
BatchedRequests(ApplicationId applicationId,
/**
* Iterator Type.
*/
public enum IteratorType {
SERIAL,
POPULAR_TAGS
}
public BatchedRequests(IteratorType type, ApplicationId applicationId,
Collection<SchedulingRequest> requests, int attempt) {
this.iteratorType = type;
this.applicationId = applicationId;
this.requests = requests;
this.placementAttempt = attempt;
}
/**
* Exposes SchedulingRequest Iterator interface which can be used
* to traverse requests using different heuristics i.e. Tag Popularity
* @return SchedulingRequest Iterator.
*/
@Override
public Iterator<SchedulingRequest> iterator() {
switch (this.iteratorType) {
case SERIAL:
return new SerialIterator(requests);
case POPULAR_TAGS:
return new PopularTagsIterator(requests);
default:
return null;
}
}
/**
* Get Application Id.
* @return Application Id.
*/
ApplicationId getApplicationId() {
public ApplicationId getApplicationId() {
return applicationId;
}
@ -73,11 +104,11 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
* Add a Scheduling request to the batch.
* @param req Scheduling Request.
*/
void addToBatch(SchedulingRequest req) {
public void addToBatch(SchedulingRequest req) {
requests.add(req);
}
void addToBlacklist(Set<String> tags, SchedulerNode node) {
public void addToBlacklist(Set<String> tags, SchedulerNode node) {
if (tags != null && !tags.isEmpty()) {
// We are currently assuming a single allocation tag
// per scheduler request currently.
@ -90,7 +121,7 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
* Get placement attempt.
* @return PlacementAlgorithmOutput placement Attempt.
*/
int getPlacementAttempt() {
public int getPlacementAttempt() {
return placementAttempt;
}
@ -99,7 +130,7 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
* @param tag Tag.
* @return Set of blacklisted Nodes.
*/
Set<NodeId> getBlacklist(String tag) {
public Set<NodeId> getBlacklist(String tag) {
return blacklist.getOrDefault(tag, Collections.EMPTY_SET);
}
}

View File

@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
@ -98,6 +100,7 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
new ConcurrentHashMap<>();
private BatchedRequests.IteratorType iteratorType;
private PlacementDispatcher placementDispatcher;
@ -122,9 +125,20 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
if (instances != null && !instances.isEmpty()) {
algorithm = instances.get(0);
} else {
algorithm = new SamplePlacementAlgorithm();
algorithm = new DefaultPlacementAlgorithm();
}
LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName());
String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration()
.get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR,
BatchedRequests.IteratorType.SERIAL.name());
LOG.info("Placement Algorithm Iterator[{}]", iteratorName);
try {
iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName);
} catch (IllegalArgumentException e) {
throw new YarnRuntimeException(
"Could not instantiate Placement Algorithm Iterator: ", e);
}
LOG.info("Planning Algorithm [{}]", algorithm.getClass().getName());
int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
@ -188,9 +202,8 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
List<SchedulingRequest> schedulingRequests) {
if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
this.placementDispatcher.dispatch(
new BatchedRequests(appAttemptId.getApplicationId(),
schedulingRequests, 1));
this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
appAttemptId.getApplicationId(), schedulingRequests, 1));
}
}
@ -329,11 +342,10 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
}
}
if (!isAdded) {
BatchedRequests br =
new BatchedRequests(schedulerResponse.getApplicationId(),
Collections.singleton(
schedulerResponse.getSchedulingRequest()),
placementAttempt + 1);
BatchedRequests br = new BatchedRequests(iteratorType,
schedulerResponse.getApplicationId(),
Collections.singleton(schedulerResponse.getSchedulingRequest()),
placementAttempt + 1);
reqsToRetry.add(br);
br.addToBlacklist(
schedulerResponse.getSchedulingRequest().getAllocationTags(),

View File

@ -1,144 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SpecializedConstraintTransformer;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Sample Test algorithm. Assumes anti-affinity always
* It also assumes the numAllocations in resource sizing is always = 1
*
* NOTE: This is just a sample implementation. Not be actually used
*/
public class SamplePlacementAlgorithm implements ConstraintPlacementAlgorithm {
private static final Logger LOG =
LoggerFactory.getLogger(SamplePlacementAlgorithm.class);
private AllocationTagsManager tagsManager;
private PlacementConstraintManager constraintManager;
private NodeCandidateSelector nodeSelector;
@Override
public void init(RMContext rmContext) {
this.tagsManager = rmContext.getAllocationTagsManager();
this.constraintManager = rmContext.getPlacementConstraintManager();
this.nodeSelector =
filter -> ((AbstractYarnScheduler)(rmContext)
.getScheduler()).getNodes(filter);
}
@Override
public void place(ConstraintPlacementAlgorithmInput input,
ConstraintPlacementAlgorithmOutputCollector collector) {
BatchedRequests requests = (BatchedRequests)input;
ConstraintPlacementAlgorithmOutput resp =
new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
Map<String, List<SchedulingRequest>> tagIndexedRequests = new HashMap<>();
requests.getSchedulingRequests()
.stream()
.filter(r -> r.getAllocationTags() != null)
.forEach(
req -> req.getAllocationTags().forEach(
tag -> tagIndexedRequests.computeIfAbsent(tag,
k -> new ArrayList<>()).add(req))
);
for (Map.Entry<String, List<SchedulingRequest>> entry :
tagIndexedRequests.entrySet()) {
String tag = entry.getKey();
PlacementConstraint constraint =
constraintManager.getConstraint(requests.getApplicationId(),
Collections.singleton(tag));
if (constraint != null) {
// Currently works only for simple anti-affinity
// NODE scope target expressions
SpecializedConstraintTransformer transformer =
new SpecializedConstraintTransformer(constraint);
PlacementConstraint transform = transformer.transform();
TargetConstraint targetConstraint =
(TargetConstraint) transform.getConstraintExpr();
// Assume a single target expression tag;
// The Sample Algorithm assumes a constraint will always be a simple
// Target Constraint with a single entry in the target set.
// As mentioned in the class javadoc - This algorithm should be
// used mostly for testing and validating end-2-end workflow.
String targetTag =
targetConstraint.getTargetExpressions().iterator().next()
.getTargetValues().iterator().next();
// iterate over all nodes
Iterator<SchedulerNode> nodeIter = allNodes.iterator();
List<SchedulingRequest> schedulingRequests = entry.getValue();
Iterator<SchedulingRequest> reqIter = schedulingRequests.iterator();
while (reqIter.hasNext()) {
SchedulingRequest sReq = reqIter.next();
int numAllocs = sReq.getResourceSizing().getNumAllocations();
while (numAllocs > 0 && nodeIter.hasNext()) {
SchedulerNode node = nodeIter.next();
long nodeCardinality = 0;
try {
nodeCardinality = tagsManager.getNodeCardinality(
node.getNodeID(), requests.getApplicationId(),
targetTag);
if (nodeCardinality == 0 &&
!requests.getBlacklist(tag).contains(node.getNodeID())) {
numAllocs--;
sReq.getResourceSizing().setNumAllocations(numAllocs);
PlacedSchedulingRequest placedReq =
new PlacedSchedulingRequest(sReq);
placedReq.setPlacementAttempt(requests.getPlacementAttempt());
placedReq.getNodes().add(node);
resp.getPlacedRequests().add(placedReq);
}
} catch (InvalidAllocationTagsQueryException e) {
LOG.warn("Got exception from TagManager !", e);
}
}
}
}
}
// Add all requests whose numAllocations still > 0 to rejected list.
requests.getSchedulingRequests().stream()
.filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
.forEach(rejReq -> resp.getRejectedRequests().add(rejReq));
collector.collect(resp);
}
}

View File

@ -75,24 +75,24 @@ public class TestAllocationTagsManager {
// 3 Containers from app1
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
// 1 Container from app2
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Get Node Cardinality of app1 on node1, with tag "mapper"
@ -170,24 +170,21 @@ public class TestAllocationTagsManager {
// Finish all containers:
atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// Expect all cardinality to be 0
// Get Cardinality of app1 on node1, with tag "mapper"
@ -270,25 +267,22 @@ public class TestAllocationTagsManager {
// 3 Containers from app1
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 2),
TestUtils.getMockContainerId(2, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 4),
ImmutableSet.of("reducer"));
TestUtils.getMockContainerId(2, 4), ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
// 1 Container from app2
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// Get Rack Cardinality of app1 on rack0, with tag "mapper"
Assert.assertEquals(1, atm.getRackCardinality("rack0",
@ -325,45 +319,39 @@ public class TestAllocationTagsManager {
// Add a bunch of containers
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// Remove all these containers
atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
atm.removeContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// Check internal data structure
Assert.assertEquals(0,
@ -374,6 +362,87 @@ public class TestAllocationTagsManager {
Assert.assertEquals(0, atm.getPerAppRackMappings().size());
}
@Test
public void testTempContainerAllocations()
throws InvalidAllocationTagsQueryException {
/**
* Construct both TEMP and normal containers: Node1: TEMP container_1_1
* (mapper/reducer/app_1) container_1_2 (service/app_1)
*
* Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
*/
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
// 3 Containers from app1
atm.addTempContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
// 1 Container from app2
atm.addTempContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
// Expect tag mappings to be present including temp Tags
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
Long::sum));
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Do a temp Tag cleanup on app2
atm.cleanTempContainers(TestUtils.getMockApplicationId(2));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Expect app1 to be unaffected
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
// Do a cleanup on app1 as well
atm.cleanTempContainers(TestUtils.getMockApplicationId(1));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
// Non temp-tags should be unaffected
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
Long::sum));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Expect app2 with no containers, and app1 with 2 containers across 2 nodes
Assert.assertEquals(2,
atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
.getTypeToTagsWithCount().size());
Assert.assertNull(
atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
}
@Test
public void testQueryCardinalityWithIllegalParameters()
throws InvalidAllocationTagsQueryException {
@ -385,24 +454,21 @@ public class TestAllocationTagsManager {
// Add a bunch of containers
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// No node-id
boolean caughtException = false;

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TestPlacementProcessor.schedulingRequest;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
import org.junit.Assert;
import org.junit.Test;
/**
* Test Request Iterator.
*/
public class TestBatchedRequestsIterators {
@Test
public void testSerialIterator() throws Exception {
List<SchedulingRequest> schedulingRequestList =
Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"));
BatchedRequests batchedRequests = new BatchedRequests(
BatchedRequests.IteratorType.SERIAL, null, schedulingRequestList, 1);
Iterator<SchedulingRequest> requestIterator = batchedRequests.iterator();
long prevAllocId = 0;
while (requestIterator.hasNext()) {
SchedulingRequest request = requestIterator.next();
Assert.assertTrue(request.getAllocationRequestId() > prevAllocId);
prevAllocId = request.getAllocationRequestId();
}
}
@Test
public void testPopularTagsIterator() throws Exception {
List<SchedulingRequest> schedulingRequestList =
Arrays.asList(schedulingRequest(1, 1, 1, 512, "pri", "foo"),
schedulingRequest(1, 2, 1, 512, "bar"),
schedulingRequest(1, 3, 1, 512, "foo", "pri"),
schedulingRequest(1, 4, 1, 512, "test"),
schedulingRequest(1, 5, 1, 512, "pri", "bar"));
BatchedRequests batchedRequests =
new BatchedRequests(BatchedRequests.IteratorType.POPULAR_TAGS, null,
schedulingRequestList, 1);
Iterator<SchedulingRequest> requestIterator = batchedRequests.iterator();
long recCcount = 0;
while (requestIterator.hasNext()) {
SchedulingRequest request = requestIterator.next();
if (recCcount < 3) {
Assert.assertTrue(request.getAllocationTags().contains("pri"));
} else {
Assert.assertTrue(request.getAllocationTags().contains("bar")
|| request.getAllocationTags().contains("test"));
}
recCcount++;
}
}
}

View File

@ -373,13 +373,13 @@ public class TestPlacementProcessor {
rej.getReason());
}
private static SchedulingRequest schedulingRequest(
protected static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int cores, int mem, String... tags) {
return schedulingRequest(priority, allocReqId, cores, mem,
ExecutionType.GUARANTEED, tags);
}
private static SchedulingRequest schedulingRequest(
protected static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int cores, int mem,
ExecutionType execType, String... tags) {
return SchedulingRequest.newBuilder()