YARN-7666. Introduce scheduler specific environment variable support in ApplicationSubmissionContext for better scheduling placement configurations. (Sunil G via wangda)

Change-Id: I0fd826490f5160d47d42af2a9ac0bd8ec4e959dc
This commit is contained in:
Wangda Tan 2018-01-05 15:12:04 -08:00
parent 2aa4f0a559
commit a81144daa0
14 changed files with 279 additions and 27 deletions

View File

@ -598,4 +598,25 @@ public abstract class ApplicationSubmissionContext {
@Unstable
public abstract void setApplicationTimeouts(
Map<ApplicationTimeoutType, Long> applicationTimeouts);
/**
* Get application scheduling environment variables stored as a key value
* pair map for application.
*
* @return placement envs for application.
*/
@Public
@Unstable
public abstract Map<String, String> getApplicationSchedulingPropertiesMap();
/**
* Set the scheduling envs for the application.
*
* @param schedulingEnvMap
* A map of env's for the application scheduling preferences.
*/
@Public
@Unstable
public abstract void setApplicationSchedulingPropertiesMap(
Map<String, String> schedulingEnvMap);
}

View File

@ -459,6 +459,7 @@ message ApplicationSubmissionContextProto {
optional string node_label_expression = 16;
repeated ResourceRequestProto am_container_resource_request = 17;
repeated ApplicationTimeoutMapProto application_timeouts = 18;
repeated StringStringMapProto application_scheduling_properties = 19;
}
enum ApplicationTimeoutTypeProto {

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
import com.google.protobuf.TextFormat;
@ -71,6 +72,7 @@ extends ApplicationSubmissionContext {
private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null;
private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
private Map<String, String> schedulingProperties = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
@ -141,6 +143,9 @@ extends ApplicationSubmissionContext {
if (this.applicationTimeouts != null) {
addApplicationTimeouts();
}
if (this.schedulingProperties != null) {
addApplicationSchedulingProperties();
}
}
private void mergeLocalToProto() {
@ -662,4 +667,71 @@ extends ApplicationSubmissionContext {
};
this.builder.addAllApplicationTimeouts(values);
}
private void addApplicationSchedulingProperties() {
maybeInitBuilder();
builder.clearApplicationSchedulingProperties();
if (this.schedulingProperties == null) {
return;
}
Iterable<? extends StringStringMapProto> values =
new Iterable<StringStringMapProto>() {
@Override
public Iterator<StringStringMapProto> iterator() {
return new Iterator<StringStringMapProto>() {
private Iterator<String> iterator = schedulingProperties.keySet()
.iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public StringStringMapProto next() {
String key = iterator.next();
return StringStringMapProto.newBuilder()
.setValue(schedulingProperties.get(key)).setKey(key).build();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllApplicationSchedulingProperties(values);
}
private void initApplicationSchedulingProperties() {
if (this.schedulingProperties != null) {
return;
}
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
List<StringStringMapProto> properties = p
.getApplicationSchedulingPropertiesList();
this.schedulingProperties = new HashMap<String, String>(properties.size());
for (StringStringMapProto envProto : properties) {
this.schedulingProperties.put(envProto.getKey(), envProto.getValue());
}
}
@Override
public Map<String, String> getApplicationSchedulingPropertiesMap() {
initApplicationSchedulingProperties();
return this.schedulingProperties;
}
@Override
public void setApplicationSchedulingPropertiesMap(
Map<String, String> schedulingPropertyMap) {
if (schedulingPropertyMap == null) {
return;
}
initApplicationSchedulingProperties();
this.schedulingProperties.clear();
this.schedulingProperties.putAll(schedulingPropertyMap);
}
}

View File

@ -311,4 +311,10 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return ApplicationPlacementContext
*/
ApplicationPlacementContext getApplicationPlacementContext();
/**
* Get the application scheduling environment variables.
* @return Map of envs related to application scheduling preferences.
*/
Map<String, String> getApplicationSchedulingEnvs();
}

View File

@ -82,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement
.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -151,6 +150,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
private final String applicationType;
private final Set<String> applicationTags;
private Map<String, String> applicationSchedulingEnvs = new HashMap<>();
private final long attemptFailuresValidityInterval;
private boolean amBlacklistingEnabled = false;
@ -489,6 +489,15 @@ public class RMAppImpl implements RMApp, Recoverable {
this.placementContext = placementContext;
// If applications are not explicitly specifying envs, try to pull from
// AM container environment lists.
if(submissionContext.getAMContainerSpec() != null) {
applicationSchedulingEnvs
.putAll(submissionContext.getAMContainerSpec().getEnvironment());
}
applicationSchedulingEnvs
.putAll(submissionContext.getApplicationSchedulingPropertiesMap());
long localLogAggregationStatusTimeout =
conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
@ -2029,10 +2038,14 @@ public class RMAppImpl implements RMApp, Recoverable {
/**
* Clear Unused fields to free memory.
* @param app
*/
private void clearUnusedFields() {
this.submissionContext.setAMContainerSpec(null);
this.submissionContext.setLogAggregationContext(null);
}
@Override
public Map<String, String> getApplicationSchedulingEnvs() {
return this.applicationSchedulingEnvs;
}
}

View File

@ -45,10 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -90,10 +90,12 @@ public class AppSchedulingInfo {
private final ReentrantReadWriteLock.WriteLock writeLock;
public final ContainerUpdateContext updateContext;
public final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
long epoch, ResourceUsage appResourceUsage) {
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
ResourceUsage appResourceUsage,
Map<String, String> applicationSchedulingEnvs) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
@ -102,6 +104,7 @@ public class AppSchedulingInfo {
this.containerIdCounter = new AtomicLong(
epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
updateContext = new ContainerUpdateContext(this);
@ -214,21 +217,24 @@ public class AppSchedulingInfo {
dedupRequests.entrySet()) {
SchedulerRequestKey schedulerRequestKey = entry.getKey();
if (!schedulerKeyToAppPlacementAllocator.containsKey(
schedulerRequestKey)) {
if (!schedulerKeyToAppPlacementAllocator
.containsKey(schedulerRequestKey)) {
AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = ApplicationPlacementFactory
.getAppPlacementAllocator(applicationSchedulingEnvs
.get(ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS));
placementAllocatorInstance.setAppSchedulingInfo(this);
schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
new LocalityAppPlacementAllocator<>(this));
placementAllocatorInstance);
}
// Update AppPlacementAllocator
PendingAskUpdateResult pendingAmountChanges =
schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey)
.updatePendingAsk(entry.getValue().values(),
PendingAskUpdateResult pendingAmountChanges = schedulerKeyToAppPlacementAllocator
.get(schedulerRequestKey).updatePendingAsk(entry.getValue().values(),
recoverPreemptedRequestForAContainer);
if (null != pendingAmountChanges) {
updatePendingResources(
pendingAmountChanges, schedulerRequestKey,
updatePendingResources(pendingAmountChanges, schedulerRequestKey,
queue.getMetrics());
offswitchResourcesUpdated = true;
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
/**
* Factory class to build various application placement policies.
*/
@Public
@Unstable
public class ApplicationPlacementFactory {
/**
* Get AppPlacementAllocator related to the placement type requested.
*
* @param appPlacementAllocatorName
* allocator class name.
* @return Specific AppPlacementAllocator instance based on type
*/
public static AppPlacementAllocator<SchedulerNode> getAppPlacementAllocator(
String appPlacementAllocatorName) {
Class<?> policyClass;
try {
if (appPlacementAllocatorName == null) {
policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
} else {
policyClass = Class.forName(appPlacementAllocatorName);
}
} catch (ClassNotFoundException e) {
policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
}
if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) {
policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
}
@SuppressWarnings("unchecked")
AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = (AppPlacementAllocator<SchedulerNode>) ReflectionUtils
.newInstance(policyClass, null);
return placementAllocatorInstance;
}
}

View File

@ -197,6 +197,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
protected ReentrantReadWriteLock.ReadLock readLock;
protected ReentrantReadWriteLock.WriteLock writeLock;
private Map<String, String> applicationSchedulingEnvs = new HashMap<>();
// Not confirmed allocation resource, will be used to avoid too many proposal
// rejected because of duplicated allocation
private AtomicLong unconfirmedAllocatedMem = new AtomicLong();
@ -207,9 +209,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
RMContext rmContext) {
Preconditions.checkNotNull(rmContext, "RMContext should not be null");
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
this.queue = queue;
this.pendingRelease = Collections.newSetFromMap(
new ConcurrentHashMap<ContainerId, Boolean>());
@ -227,8 +226,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
this.logAggregationContext =
appSubmissionContext.getLogAggregationContext();
}
applicationSchedulingEnvs = rmApp.getApplicationSchedulingEnvs();
}
this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
applicationSchedulingEnvs);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@ -1434,4 +1437,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return diagnosticMessage;
}
}
public Map<String, String> getApplicationSchedulingEnvs() {
return this.applicationSchedulingEnvs;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
/**
* This class will keep all Scheduling env's names which will help in
* placement calculations.
*/
public class ApplicationSchedulingConfig {
@InterfaceAudience.Private
public static final String ENV_APPLICATION_PLACEMENT_TYPE_CLASS =
"APPLICATION_PLACEMENT_TYPE_CLASS";
@InterfaceAudience.Private
public static final Class<? extends AppPlacementAllocator>
DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class;
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@ -157,4 +158,12 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* Print human-readable requests to LOG debug.
*/
void showRequests();
/**
* Set app scheduling info.
*
* @param appSchedulingInfo
* app info object.
*/
void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo);
}

View File

@ -66,6 +66,12 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
this.appSchedulingInfo = info;
}
public LocalityAppPlacementAllocator() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@Override
@SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator(
@ -419,4 +425,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
writeLock.unlock();
}
}
@Override
public void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo) {
this.appSchedulingInfo = appSchedulingInfo;
}
}

View File

@ -254,6 +254,11 @@ public abstract class MockAsm extends MockApps {
public CollectorInfo getCollectorInfo() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Map<String, String> getApplicationSchedulingEnvs() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
public static RMApp newApplication(int i) {

View File

@ -338,4 +338,9 @@ public class MockRMApp implements RMApp {
public CollectorInfo getCollectorInfo() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Map<String, String> getApplicationSchedulingEnvs() {
return null;
}
}

View File

@ -21,10 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.*;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -47,8 +44,9 @@ public class TestAppSchedulingInfo {
FSLeafQueue queue = mock(FSLeafQueue.class);
doReturn("test").when(queue).getQueueName();
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
appAttemptId, "test", queue, null, 0, new ResourceUsage());
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId,
"test", queue, null, 0, new ResourceUsage(),
new HashMap<String, String>());
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
new ArrayList<String>());
@ -120,7 +118,7 @@ public class TestAppSchedulingInfo {
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
new ResourceUsage());
new ResourceUsage(), new HashMap<String, String>());
Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1);