YARN-6596. Introduce Placement Constraint Manager module. (Konstantinos Karanasos via asuresh)

This commit is contained in:
Arun Suresh 2017-12-22 13:26:30 -08:00
parent 37f1a7b64f
commit 1efb2b6f25
9 changed files with 784 additions and 0 deletions

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@ -109,6 +110,7 @@ public class RMActiveServiceContext {
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
private QueueLimitCalculator queueLimitCalculator;
private AllocationTagsManager allocationTagsManager;
private PlacementConstraintManager placementConstraintManager;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@ -411,6 +413,19 @@ public class RMActiveServiceContext {
this.allocationTagsManager = allocationTagsManager;
}
@Private
@Unstable
public PlacementConstraintManager getPlacementConstraintManager() {
return placementConstraintManager;
}
@Private
@Unstable
public void setPlacementConstraintManager(
PlacementConstraintManager placementConstraintManager) {
this.placementConstraintManager = placementConstraintManager;
}
@Private
@Unstable
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@ -171,4 +172,9 @@ public interface RMContext extends ApplicationMasterServiceContext {
AllocationTagsManager getAllocationTagsManager();
void setAllocationTagsManager(AllocationTagsManager allocationTagsManager);
PlacementConstraintManager getPlacementConstraintManager();
void setPlacementConstraintManager(
PlacementConstraintManager placementConstraintManager);
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@ -515,6 +516,18 @@ public class RMContextImpl implements RMContext {
activeServiceContext.setAllocationTagsManager(allocationTagsManager);
}
@Override
public PlacementConstraintManager getPlacementConstraintManager() {
return activeServiceContext.getPlacementConstraintManager();
}
@Override
public void setPlacementConstraintManager(
PlacementConstraintManager placementConstraintManager) {
activeServiceContext
.setPlacementConstraintManager(placementConstraintManager);
}
@Override
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return activeServiceContext.getRMDelegatedNodeLabelsUpdater();

View File

@ -97,6 +97,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -498,6 +500,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected AllocationTagsManager createAllocationTagsManager() {
return new AllocationTagsManager(this.rmContext);
}
protected PlacementConstraintManagerService
createPlacementConstraintManager() {
// Use the in memory Placement Constraint Manager.
return new MemoryPlacementConstraintManager();
}
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer();
@ -628,6 +636,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
createAllocationTagsManager();
rmContext.setAllocationTagsManager(allocationTagsManager);
PlacementConstraintManagerService placementConstraintManager =
createPlacementConstraintManager();
addService(placementConstraintManager);
rmContext.setPlacementConstraintManager(placementConstraintManager);
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {

View File

@ -0,0 +1,282 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* In memory implementation of the {@link PlacementConstraintManagerService}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MemoryPlacementConstraintManager
extends PlacementConstraintManagerService {
private static final Logger LOG =
LoggerFactory.getLogger(MemoryPlacementConstraintManager.class);
private ReentrantReadWriteLock.ReadLock readLock;
private ReentrantReadWriteLock.WriteLock writeLock;
/**
* Stores the global constraints that will be manipulated by the cluster
* admin. The key of each entry is the tag that will enable the corresponding
* constraint.
*/
private Map<String, PlacementConstraint> globalConstraints;
/**
* Stores the constraints for each application, along with the allocation tags
* that will enable each of the constraints for a given application.
*/
private Map<ApplicationId, Map<String, PlacementConstraint>> appConstraints;
public MemoryPlacementConstraintManager() {
this.globalConstraints = new HashMap<>();
this.appConstraints = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
public void registerApplication(ApplicationId appId,
Map<Set<String>, PlacementConstraint> constraintMap) {
// Check if app already exists. If not, prepare its constraint map.
Map<String, PlacementConstraint> constraintsForApp = new HashMap<>();
try {
readLock.lock();
if (appConstraints.get(appId) != null) {
LOG.warn("Application {} has already been registered.", appId);
return;
}
// Go over each sourceTag-constraint pair, validate it, and add it to the
// constraint map for this app.
for (Map.Entry<Set<String>, PlacementConstraint> entry : constraintMap
.entrySet()) {
Set<String> sourceTags = entry.getKey();
PlacementConstraint constraint = entry.getValue();
if (validateConstraint(sourceTags, constraint)) {
String sourceTag = getValidSourceTag(sourceTags);
constraintsForApp.put(sourceTag, constraint);
}
}
} finally {
readLock.unlock();
}
if (constraintsForApp.isEmpty()) {
LOG.info("Application {} was registered, but no constraints were added.",
appId);
}
// Update appConstraints.
try {
writeLock.lock();
appConstraints.put(appId, constraintsForApp);
} finally {
writeLock.unlock();
}
}
@Override
public void addConstraint(ApplicationId appId, Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
try {
writeLock.lock();
Map<String, PlacementConstraint> constraintsForApp =
appConstraints.get(appId);
if (constraintsForApp == null) {
LOG.info("Cannot add constraint to application {}, as it has not "
+ "been registered yet.", appId);
return;
}
addConstraintToMap(constraintsForApp, sourceTags, placementConstraint,
replace);
} finally {
writeLock.unlock();
}
}
@Override
public void addGlobalConstraint(Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
try {
writeLock.lock();
addConstraintToMap(globalConstraints, sourceTags, placementConstraint,
replace);
} finally {
writeLock.unlock();
}
}
/**
* Helper method that adds a constraint to a map for a given source tag.
* Assumes there is already a lock on the constraint map.
*
* @param constraintMap constraint map to which the constraint will be added
* @param sourceTags the source tags that will enable this constraint
* @param placementConstraint the new constraint to be added
* @param replace if true, an existing constraint for these sourceTags will be
* replaced with the new one
*/
private void addConstraintToMap(
Map<String, PlacementConstraint> constraintMap, Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
if (validateConstraint(sourceTags, placementConstraint)) {
String sourceTag = getValidSourceTag(sourceTags);
if (constraintMap.get(sourceTag) == null || replace) {
if (replace) {
LOG.info("Replacing the constraint associated with tag {} with {}.",
sourceTag, placementConstraint);
}
constraintMap.put(sourceTag, placementConstraint);
} else {
LOG.info("Constraint {} will not be added. There is already a "
+ "constraint associated with tag {}.",
placementConstraint, sourceTag);
}
}
}
@Override
public Map<Set<String>, PlacementConstraint> getConstraints(
ApplicationId appId) {
try {
readLock.lock();
if (appConstraints.get(appId) == null) {
LOG.info("Application {} is not registered in the Placement "
+ "Constraint Manager.", appId);
return null;
}
// Copy to a new map and return an unmodifiable version of it.
// Each key of the map is a set with a single source tag.
Map<Set<String>, PlacementConstraint> constraintMap =
appConstraints.get(appId).entrySet().stream()
.collect(Collectors.toMap(
e -> Stream.of(e.getKey()).collect(Collectors.toSet()),
e -> e.getValue()));
return Collections.unmodifiableMap(constraintMap);
} finally {
readLock.unlock();
}
}
@Override
public PlacementConstraint getConstraint(ApplicationId appId,
Set<String> sourceTags) {
if (!validateSourceTags(sourceTags)) {
return null;
}
String sourceTag = getValidSourceTag(sourceTags);
try {
readLock.lock();
if (appConstraints.get(appId) == null) {
LOG.info("Application {} is not registered in the Placement "
+ "Constraint Manager.", appId);
return null;
}
// TODO: Merge this constraint with the global one for this tag, if one
// exists.
return appConstraints.get(appId).get(sourceTag);
} finally {
readLock.unlock();
}
}
@Override
public PlacementConstraint getGlobalConstraint(Set<String> sourceTags) {
if (!validateSourceTags(sourceTags)) {
return null;
}
String sourceTag = getValidSourceTag(sourceTags);
try {
readLock.lock();
return globalConstraints.get(sourceTag);
} finally {
readLock.unlock();
}
}
@Override
public void unregisterApplication(ApplicationId appId) {
try {
writeLock.lock();
appConstraints.remove(appId);
} finally {
writeLock.unlock();
}
}
@Override
public void removeGlobalConstraint(Set<String> sourceTags) {
if (!validateSourceTags(sourceTags)) {
return;
}
String sourceTag = getValidSourceTag(sourceTags);
try {
writeLock.lock();
globalConstraints.remove(sourceTag);
} finally {
writeLock.unlock();
}
}
@Override
public int getNumRegisteredApplications() {
try {
readLock.lock();
return appConstraints.size();
} finally {
readLock.unlock();
}
}
@Override
public int getNumGlobalConstraints() {
try {
readLock.lock();
return globalConstraints.size();
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,151 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
/**
* Interface for storing and retrieving placement constraints (see
* {@link PlacementConstraint}).
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface PlacementConstraintManager {
/**
* Register all placement constraints of an application.
*
* @param appId the application ID
* @param constraintMap the map of allocation tags to constraints for this
* application
*/
void registerApplication(ApplicationId appId,
Map<Set<String>, PlacementConstraint> constraintMap);
/**
* Add a placement constraint for a given application and a given set of
* (source) allocation tags. The constraint will be used on Scheduling
* Requests that carry this set of allocation tags.
* TODO: Support merge and not only replace when adding a constraint.
*
* @param appId the application ID
* @param sourceTags the set of allocation tags that will enable this
* constraint
* @param placementConstraint the constraint
* @param replace if true, an existing constraint for these tags will be
* replaced by the given one
*/
void addConstraint(ApplicationId appId, Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace);
/**
* Add a placement constraint that will be used globally. These constraints
* are added by the cluster administrator.
* TODO: Support merge and not only replace when adding a constraint.
*
* @param sourceTags the allocation tags that will enable this constraint
* @param placementConstraint the constraint
* @param replace if true, an existing constraint for these tags will be
* replaced by the given one
*/
void addGlobalConstraint(Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace);
/**
* Retrieve all constraints for a given application, along with the allocation
* tags that enable each constraint.
*
* @param appId the application ID
* @return the constraints for this application with the associated tags
*/
Map<Set<String>, PlacementConstraint> getConstraints(ApplicationId appId);
/**
* Retrieve the placement constraint that is associated with a set of
* allocation tags for a given application.
*
* @param appId the application ID
* @param sourceTags the allocation tags that enable this constraint
* @return the constraint
*/
PlacementConstraint getConstraint(ApplicationId appId,
Set<String> sourceTags);
/**
* Retrieve a global constraint that is associated with a given set of
* allocation tags.
*
* @param sourceTags the allocation tags that enable this constraint
* @return the constraint
*/
PlacementConstraint getGlobalConstraint(Set<String> sourceTags);
/**
* Remove the constraints that correspond to a given application.
*
* @param appId the application that will be removed.
*/
void unregisterApplication(ApplicationId appId);
/**
* Remove a global constraint that is associated with the given allocation
* tags.
*
* @param sourceTags the allocation tags
*/
void removeGlobalConstraint(Set<String> sourceTags);
/**
* Returns the number of currently registered applications in the Placement
* Constraint Manager.
*
* @return number of registered applications.
*/
int getNumRegisteredApplications();
/**
* Returns the number of global constraints registered in the Placement
* Constraint Manager.
*
* @return number of global constraints.
*/
int getNumGlobalConstraints();
/**
* Validate a placement constraint and the set of allocation tags that will
* enable it.
*
* @param sourceTags the associated allocation tags
* @param placementConstraint the constraint
* @return true if constraint and tags are valid
*/
default boolean validateConstraint(Set<String> sourceTags,
PlacementConstraint placementConstraint) {
return true;
}
}

View File

@ -0,0 +1,93 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
/**
* The service that implements the {@link PlacementConstraintManager} interface.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class PlacementConstraintManagerService extends AbstractService
implements PlacementConstraintManager {
protected static final Log LOG =
LogFactory.getLog(PlacementConstraintManagerService.class);
private PlacementConstraintManager placementConstraintManager = null;
public PlacementConstraintManagerService() {
super(PlacementConstraintManagerService.class.getName());
}
@Override
public boolean validateConstraint(Set<String> sourceTags,
PlacementConstraint placementConstraint) {
if (!validateSourceTags(sourceTags)) {
return false;
}
// TODO: Perform actual validation of the constraint (in YARN-6621).
// TODO: Perform satisfiability check for constraint.
return true;
}
/**
* Validates whether the allocation tags that will enable a constraint have
* the expected format. At the moment we support a single allocation tag per
* constraint.
*
* @param sourceTags the source allocation tags
* @return true if the tags have the expected format
*/
protected boolean validateSourceTags(Set<String> sourceTags) {
if (sourceTags.isEmpty()) {
LOG.warn("A placement constraint cannot be associated with an empty "
+ "set of tags.");
return false;
}
if (sourceTags.size() > 1) {
LOG.warn("Only a single tag can be associated with a placement "
+ "constraint currently.");
return false;
}
return true;
}
/**
* This method will return a single allocation tag. It should be called after
* validating the tags by calling {@link #validateSourceTags}.
*
* @param sourceTags the source allocation tags
* @return the single source tag
*/
protected String getValidSourceTag(Set<String> sourceTags) {
return sourceTags.iterator().next();
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
* contains classes related to scheduling containers using placement
* constraints.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.nodeAttribute;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Unit tests for {@link PlacementConstraintManagerService}.
*/
public class TestPlacementConstraintManagerService {
private PlacementConstraintManagerService pcm;
protected PlacementConstraintManagerService createPCM() {
return new MemoryPlacementConstraintManager();
}
private ApplicationId appId1, appId2;
private PlacementConstraint c1, c2, c3, c4;
private Set<String> sourceTag1, sourceTag2, sourceTag3, sourceTag4;
private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
@Before
public void before() {
this.pcm = createPCM();
// Build appIDs, constraints, source tags, and constraint map.
long ts = System.currentTimeMillis();
appId1 = BuilderUtils.newApplicationId(ts, 123);
appId2 = BuilderUtils.newApplicationId(ts, 234);
c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m")));
c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs")));
c3 = PlacementConstraints
.build(targetNotIn(NODE, nodeAttribute("java", "1.8")));
c4 = PlacementConstraints
.build(targetCardinality(RACK, 2, 10, allocationTag("zk")));
sourceTag1 = new HashSet<>(Arrays.asList("spark"));
sourceTag2 = new HashSet<>(Arrays.asList("zk"));
sourceTag3 = new HashSet<>(Arrays.asList("storm"));
sourceTag4 = new HashSet<>(Arrays.asList("hbase-m", "hbase-sec"));
constraintMap1 = Stream
.of(new SimpleEntry<>(sourceTag1, c1),
new SimpleEntry<>(sourceTag2, c2))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
constraintMap2 = Stream.of(new SimpleEntry<>(sourceTag3, c4))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
}
@Test
public void testRegisterUnregisterApps() {
Assert.assertEquals(0, pcm.getNumRegisteredApplications());
// Register two applications.
pcm.registerApplication(appId1, constraintMap1);
Assert.assertEquals(1, pcm.getNumRegisteredApplications());
Map<Set<String>, PlacementConstraint> constrMap =
pcm.getConstraints(appId1);
Assert.assertNotNull(constrMap);
Assert.assertEquals(2, constrMap.size());
Assert.assertNotNull(constrMap.get(sourceTag1));
Assert.assertNotNull(constrMap.get(sourceTag2));
pcm.registerApplication(appId2, constraintMap2);
Assert.assertEquals(2, pcm.getNumRegisteredApplications());
constrMap = pcm.getConstraints(appId2);
Assert.assertNotNull(constrMap);
Assert.assertEquals(1, constrMap.size());
Assert.assertNotNull(constrMap.get(sourceTag3));
Assert.assertNull(constrMap.get(sourceTag2));
// Try to register the same app again.
pcm.registerApplication(appId2, constraintMap1);
Assert.assertEquals(2, pcm.getNumRegisteredApplications());
// Unregister appId1.
pcm.unregisterApplication(appId1);
Assert.assertEquals(1, pcm.getNumRegisteredApplications());
Assert.assertNull(pcm.getConstraints(appId1));
Assert.assertNotNull(pcm.getConstraints(appId2));
}
@Test
public void testAddConstraint() {
// Cannot add constraint to unregistered app.
Assert.assertEquals(0, pcm.getNumRegisteredApplications());
pcm.addConstraint(appId1, sourceTag1, c1, false);
Assert.assertEquals(0, pcm.getNumRegisteredApplications());
// Register application.
pcm.registerApplication(appId1, new HashMap<>());
Assert.assertEquals(1, pcm.getNumRegisteredApplications());
Assert.assertEquals(0, pcm.getConstraints(appId1).size());
// Add two constraints.
pcm.addConstraint(appId1, sourceTag1, c1, false);
pcm.addConstraint(appId1, sourceTag2, c3, false);
Assert.assertEquals(2, pcm.getConstraints(appId1).size());
// Constraint for sourceTag1 should not be replaced.
pcm.addConstraint(appId1, sourceTag1, c2, false);
Assert.assertEquals(2, pcm.getConstraints(appId1).size());
Assert.assertEquals(c1, pcm.getConstraint(appId1, sourceTag1));
Assert.assertNotEquals(c2, pcm.getConstraint(appId1, sourceTag1));
// Now c2 should replace c1 for sourceTag1.
pcm.addConstraint(appId1, sourceTag1, c2, true);
Assert.assertEquals(2, pcm.getConstraints(appId1).size());
Assert.assertEquals(c2, pcm.getConstraint(appId1, sourceTag1));
}
@Test
public void testGlobalConstraints() {
Assert.assertEquals(0, pcm.getNumGlobalConstraints());
pcm.addGlobalConstraint(sourceTag1, c1, false);
Assert.assertEquals(1, pcm.getNumGlobalConstraints());
Assert.assertNotNull(pcm.getGlobalConstraint(sourceTag1));
// Constraint for sourceTag1 should not be replaced.
pcm.addGlobalConstraint(sourceTag1, c2, false);
Assert.assertEquals(1, pcm.getNumGlobalConstraints());
Assert.assertEquals(c1, pcm.getGlobalConstraint(sourceTag1));
Assert.assertNotEquals(c2, pcm.getGlobalConstraint(sourceTag1));
// Now c2 should replace c1 for sourceTag1.
pcm.addGlobalConstraint(sourceTag1, c2, true);
Assert.assertEquals(1, pcm.getNumGlobalConstraints());
Assert.assertEquals(c2, pcm.getGlobalConstraint(sourceTag1));
pcm.removeGlobalConstraint(sourceTag1);
Assert.assertEquals(0, pcm.getNumGlobalConstraints());
}
@Test
public void testValidateConstraint() {
// At the moment we only disallow multiple source tags to be associated with
// a constraint. TODO: More tests to be added for YARN-6621.
Assert.assertTrue(pcm.validateConstraint(sourceTag1, c1));
Assert.assertFalse(pcm.validateConstraint(sourceTag4, c1));
}
}