YARN-11026. Make default AppPlacementAllocator configurable in AppSch… (#3741)
* YARN-11026. Make default AppPlacementAllocator configurable in AppSchedulingInfo Co-authored-by: Minni Mittal <mimittal@microsoft.com>
This commit is contained in:
parent
089e06de21
commit
e8f767f2f4
|
@ -284,6 +284,10 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME =
|
public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME =
|
||||||
false;
|
false;
|
||||||
|
|
||||||
|
/** Configure default application placement allocator. */
|
||||||
|
public static final String APPLICATION_PLACEMENT_TYPE_CLASS =
|
||||||
|
YARN_PREFIX + "scheduler.app-placement-allocator.class";
|
||||||
|
|
||||||
/** Configured scheduler queue placement rules. */
|
/** Configured scheduler queue placement rules. */
|
||||||
public static final String QUEUE_PLACEMENT_RULES = YARN_PREFIX
|
public static final String QUEUE_PLACEMENT_RULES = YARN_PREFIX
|
||||||
+ "scheduler.queue-placement-rules";
|
+ "scheduler.queue-placement-rules";
|
||||||
|
|
|
@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -105,6 +106,8 @@ public class AppSchedulingInfo {
|
||||||
private final int retryAttempts;
|
private final int retryAttempts;
|
||||||
private boolean unmanagedAM;
|
private boolean unmanagedAM;
|
||||||
|
|
||||||
|
private final String defaultResourceRequestAppPlacementType;
|
||||||
|
|
||||||
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
|
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
|
||||||
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
|
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
|
||||||
ResourceUsage appResourceUsage,
|
ResourceUsage appResourceUsage,
|
||||||
|
@ -129,6 +132,31 @@ public class AppSchedulingInfo {
|
||||||
updateContext = new ContainerUpdateContext(this);
|
updateContext = new ContainerUpdateContext(this);
|
||||||
readLock = lock.readLock();
|
readLock = lock.readLock();
|
||||||
writeLock = lock.writeLock();
|
writeLock = lock.writeLock();
|
||||||
|
|
||||||
|
this.defaultResourceRequestAppPlacementType =
|
||||||
|
getDefaultResourceRequestAppPlacementType();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set default App Placement Allocator.
|
||||||
|
*
|
||||||
|
* @return app placement class.
|
||||||
|
*/
|
||||||
|
public String getDefaultResourceRequestAppPlacementType() {
|
||||||
|
if (this.rmContext != null
|
||||||
|
&& this.rmContext.getYarnConfiguration() != null) {
|
||||||
|
|
||||||
|
String appPlacementClass = applicationSchedulingEnvs.get(
|
||||||
|
ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS);
|
||||||
|
if (null != appPlacementClass) {
|
||||||
|
return appPlacementClass;
|
||||||
|
} else {
|
||||||
|
Configuration conf = rmContext.getYarnConfiguration();
|
||||||
|
return conf.get(
|
||||||
|
YarnConfiguration.APPLICATION_PLACEMENT_TYPE_CLASS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ApplicationId getApplicationId() {
|
public ApplicationId getApplicationId() {
|
||||||
|
@ -331,8 +359,7 @@ public class AppSchedulingInfo {
|
||||||
SchedulerRequestKey schedulerRequestKey = entry.getKey();
|
SchedulerRequestKey schedulerRequestKey = entry.getKey();
|
||||||
AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
|
AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
|
||||||
getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
|
getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
|
||||||
applicationSchedulingEnvs.get(
|
defaultResourceRequestAppPlacementType);
|
||||||
ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS));
|
|
||||||
|
|
||||||
// Update AppPlacementAllocator
|
// Update AppPlacementAllocator
|
||||||
PendingAskUpdateResult pendingAmountChanges =
|
PendingAskUpdateResult pendingAmountChanges =
|
||||||
|
|
|
@ -20,9 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -175,4 +178,52 @@ public class TestAppSchedulingInfo {
|
||||||
info.updateResourceRequests(reqs, false);
|
info.updateResourceRequests(reqs, false);
|
||||||
Assert.assertEquals(0, info.getSchedulerKeys().size());
|
Assert.assertEquals(0, info.getSchedulerKeys().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationPlacementType() {
|
||||||
|
String DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS =
|
||||||
|
LocalityAppPlacementAllocator.class.getName();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(conf);
|
||||||
|
ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appIdImpl, 1);
|
||||||
|
Queue queue = mock(Queue.class);
|
||||||
|
AppSchedulingInfo info = new AppSchedulingInfo(appAttemptId, "test", queue,
|
||||||
|
mock(ActiveUsersManager.class), 0, new ResourceUsage(), new HashMap<>(),
|
||||||
|
rmContext, false);
|
||||||
|
Assert.assertEquals(info.getApplicationSchedulingEnvs(), new HashMap<>());
|
||||||
|
// This should return null as nothing is set in the conf.
|
||||||
|
Assert.assertNull(info.getDefaultResourceRequestAppPlacementType());
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.APPLICATION_PLACEMENT_TYPE_CLASS,
|
||||||
|
DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS);
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(conf);
|
||||||
|
info = new AppSchedulingInfo(appAttemptId, "test", queue,
|
||||||
|
mock(ActiveUsersManager.class), 0, new ResourceUsage(), new HashMap<>(),
|
||||||
|
rmContext, false);
|
||||||
|
Assert.assertEquals(info.getDefaultResourceRequestAppPlacementType(),
|
||||||
|
DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationPlacementTypeNotConfigured() {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
when(rmContext.getYarnConfiguration()).thenReturn(conf);
|
||||||
|
ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appIdImpl, 1);
|
||||||
|
Queue queue = mock(Queue.class);
|
||||||
|
HashMap<String, String> applicationSchedulingEnvs = new HashMap<>();
|
||||||
|
applicationSchedulingEnvs.put("APPLICATION_PLACEMENT_TYPE_CLASS",
|
||||||
|
LocalityAppPlacementAllocator.class.getName());
|
||||||
|
AppSchedulingInfo info = new AppSchedulingInfo(appAttemptId, "test", queue,
|
||||||
|
mock(ActiveUsersManager.class), 0, new ResourceUsage(),
|
||||||
|
applicationSchedulingEnvs, rmContext, false);
|
||||||
|
// This should be set from applicationSchedulingEnvs
|
||||||
|
Assert.assertEquals(info.getDefaultResourceRequestAppPlacementType(),
|
||||||
|
LocalityAppPlacementAllocator.class.getName());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue