YARN-10360. Support Multi Node Placement in SingleConstraintAppPlacementAllocator

Reviewed by Sunil G.
This commit is contained in:
Prabhu Joseph 2020-07-22 15:40:37 +05:30 committed by Prabhu Joseph
parent 83fd15b412
commit 5c927eb550
5 changed files with 100 additions and 65 deletions

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;
@ -29,32 +31,83 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for Distributed Shell With Multiple Node Managers.
* Parameter 0 tests with Single Node Placement and
* parameter 1 tests with Multiple Node Placement.
*/
@RunWith(value = Parameterized.class)
public class TestDSWithMultipleNodeManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class);
static final int NUM_NMS = 2;
TestDistributedShell distShellTest;
private final Boolean multiNodePlacementEnabled;
private static final String POLICY_CLASS_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement."
+ "ResourceUsageMultiNodeLookupPolicy";
@Parameterized.Parameters
public static Collection<Boolean> getParams() {
return Arrays.asList(false, true);
}
public TestDSWithMultipleNodeManager(Boolean multiNodePlacementEnabled) {
this.multiNodePlacementEnabled = multiNodePlacementEnabled;
}
private YarnConfiguration getConfiguration(
boolean multiNodePlacementConfigs) {
YarnConfiguration conf = new YarnConfiguration();
if (multiNodePlacementConfigs) {
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
"resource-based");
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
"resource-based");
String policyName =
CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based" + ".class";
conf.set(policyName, POLICY_CLASS_NAME);
conf.setBoolean(
CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true);
}
return conf;
}
@Before
public void setup() throws Exception {
distShellTest = new TestDistributedShell();
distShellTest.setupInternal(NUM_NMS);
distShellTest.setupInternal(NUM_NMS,
getConfiguration(multiNodePlacementEnabled));
}
@After

View File

@ -141,18 +141,21 @@ public class TestDistributedShell {
@Before
public void setup() throws Exception {
setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion());
setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion(),
new YarnConfiguration());
}
protected void setupInternal(int numNodeManager) throws Exception {
setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION);
protected void setupInternal(int numNodeManager,
YarnConfiguration yarnConfig) throws Exception {
setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION, yarnConfig);
}
private void setupInternal(int numNodeManager, float timelineVersion)
private void setupInternal(int numNodeManager, float timelineVersion,
YarnConfiguration yarnConfig)
throws Exception {
LOG.info("Starting up YARN cluster");
conf = new YarnConfiguration();
this.conf = yarnConfig;
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
MIN_ALLOCATION_MB);
// reduce the teardown waiting time

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
@ -26,9 +27,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Iterator;
@ -59,14 +63,35 @@ public abstract class AppPlacementAllocator<N extends SchedulerNode> {
protected SchedulerRequestKey schedulerRequestKey;
protected RMContext rmContext;
private AtomicInteger placementAttempt = new AtomicInteger(0);
private MultiNodeSortingManager<N> multiNodeSortingManager = null;
private String multiNodeSortPolicyName;
private static final Logger LOG =
LoggerFactory.getLogger(AppPlacementAllocator.class);
/**
* Get iterator of preferred node depends on requirement and/or availability.
* @param candidateNodeSet input CandidateNodeSet
* @return iterator of preferred node
*/
public abstract Iterator<N> getPreferredNodeIterator(
CandidateNodeSet<N> candidateNodeSet);
public Iterator<N> getPreferredNodeIterator(
CandidateNodeSet<N> candidateNodeSet) {
// Now only handle the case that single node in the candidateNodeSet
// TODO, Add support to multi-hosts inside candidateNodeSet which is passed
// in.
N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
if (singleNode != null) {
return IteratorUtils.singletonIterator(singleNode);
}
// singleNode will be null if Multi-node placement lookup is enabled, and
// hence could consider sorting policies.
return multiNodeSortingManager.getMultiNodeSortIterator(
candidateNodeSet.getAllNodes().values(),
candidateNodeSet.getPartition(),
multiNodeSortPolicyName);
}
/**
* Replace existing pending asks by the new requests
@ -200,6 +225,17 @@ public abstract class AppPlacementAllocator<N extends SchedulerNode> {
this.appSchedulingInfo = appSchedulingInfo;
this.rmContext = rmContext;
this.schedulerRequestKey = schedulerRequestKey;
multiNodeSortPolicyName = appSchedulingInfo
.getApplicationSchedulingEnvs().get(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS);
multiNodeSortingManager = (MultiNodeSortingManager<N>) rmContext
.getMultiNodeSortingManager();
if (LOG.isDebugEnabled()) {
LOG.debug(
"nodeLookupPolicy used for " + appSchedulingInfo.getApplicationId()
+ " is " + ((multiNodeSortPolicyName != null)
? multiNodeSortPolicyName : ""));
}
}
/**

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,14 +31,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -60,8 +57,6 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
new ConcurrentHashMap<>();
private volatile String primaryRequestedPartition =
RMNodeLabelsManager.NO_LABEL;
private MultiNodeSortingManager<N> multiNodeSortingManager = null;
private String multiNodeSortPolicyName;
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
@ -77,40 +72,6 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
public void initialize(AppSchedulingInfo appSchedulingInfo,
SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
multiNodeSortPolicyName = appSchedulingInfo
.getApplicationSchedulingEnvs().get(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS);
multiNodeSortingManager = (MultiNodeSortingManager<N>) rmContext
.getMultiNodeSortingManager();
if (LOG.isDebugEnabled()) {
LOG.debug(
"nodeLookupPolicy used for " + appSchedulingInfo
.getApplicationId()
+ " is " + ((multiNodeSortPolicyName != null) ?
multiNodeSortPolicyName :
""));
}
}
@Override
@SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator(
CandidateNodeSet<N> candidateNodeSet) {
// Now only handle the case that single node in the candidateNodeSet
// TODO, Add support to multi-hosts inside candidateNodeSet which is passed
// in.
N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
if (singleNode != null) {
return IteratorUtils.singletonIterator(singleNode);
}
// singleNode will be null if Multi-node placement lookup is enabled, and
// hence could consider sorting policies.
return multiNodeSortingManager.getMultiNodeSortIterator(
candidateNodeSet.getAllNodes().values(),
candidateNodeSet.getPartition(),
multiNodeSortPolicyName);
}
private boolean hasRequestLabelChanged(ResourceRequest requestOne,

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,7 +45,6 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -78,22 +76,6 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
writeLock = lock.writeLock();
}
@Override
@SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator(
CandidateNodeSet<N> candidateNodeSet) {
// Now only handle the case that single node in the candidateNodeSet
// TODO, Add support to multi-hosts inside candidateNodeSet which is passed
// in.
N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
if (null != singleNode) {
return IteratorUtils.singletonIterator(singleNode);
}
return IteratorUtils.emptyIterator();
}
@Override
public PendingAskUpdateResult updatePendingAsk(
Collection<ResourceRequest> requests,