From a81144daa012e13590725f67f53e35ef84a6f1ec Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 5 Jan 2018 15:12:04 -0800 Subject: [PATCH] YARN-7666. Introduce scheduler specific environment variable support in ApplicationSubmissionContext for better scheduling placement configurations. (Sunil G via wangda) Change-Id: I0fd826490f5160d47d42af2a9ac0bd8ec4e959dc --- .../records/ApplicationSubmissionContext.java | 21 ++++++ .../src/main/proto/yarn_protos.proto | 1 + .../ApplicationSubmissionContextPBImpl.java | 74 ++++++++++++++++++- .../server/resourcemanager/rmapp/RMApp.java | 6 ++ .../resourcemanager/rmapp/RMAppImpl.java | 17 ++++- .../scheduler/AppSchedulingInfo.java | 34 +++++---- .../ApplicationPlacementFactory.java | 63 ++++++++++++++++ .../SchedulerApplicationAttempt.java | 13 +++- .../common/ApplicationSchedulingConfig.java | 35 +++++++++ .../placement/AppPlacementAllocator.java | 9 +++ .../LocalityAppPlacementAllocator.java | 11 +++ .../applicationsmanager/MockAsm.java | 5 ++ .../resourcemanager/rmapp/MockRMApp.java | 5 ++ .../scheduler/TestAppSchedulingInfo.java | 12 ++- 14 files changed, 279 insertions(+), 27 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 38db60cd9e1..d2adfdc478a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -598,4 +598,25 @@ public abstract class ApplicationSubmissionContext { @Unstable public abstract void setApplicationTimeouts( Map applicationTimeouts); + + /** + * Get application scheduling environment variables stored as a key value + * pair map for application. + * + * @return placement envs for application. + */ + @Public + @Unstable + public abstract Map getApplicationSchedulingPropertiesMap(); + + /** + * Set the scheduling envs for the application. + * + * @param schedulingEnvMap + * A map of env's for the application scheduling preferences. + */ + @Public + @Unstable + public abstract void setApplicationSchedulingPropertiesMap( + Map schedulingEnvMap); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 3a9662b2540..b6ea5f915a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -459,6 +459,7 @@ message ApplicationSubmissionContextProto { optional string node_label_expression = 16; repeated ResourceRequestProto am_container_resource_request = 17; repeated ApplicationTimeoutMapProto application_timeouts = 18; + repeated StringStringMapProto application_scheduling_properties = 19; } enum ApplicationTimeoutTypeProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index e3dbf4055be..0c91e1844ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto; import com.google.protobuf.TextFormat; @@ -71,6 +72,7 @@ extends ApplicationSubmissionContext { private LogAggregationContext logAggregationContext = null; private ReservationId reservationId = null; private Map applicationTimeouts = null; + private Map schedulingProperties = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); @@ -141,6 +143,9 @@ extends ApplicationSubmissionContext { if (this.applicationTimeouts != null) { addApplicationTimeouts(); } + if (this.schedulingProperties != null) { + addApplicationSchedulingProperties(); + } } private void mergeLocalToProto() { @@ -662,4 +667,71 @@ extends ApplicationSubmissionContext { }; this.builder.addAllApplicationTimeouts(values); } -} + + private void addApplicationSchedulingProperties() { + maybeInitBuilder(); + builder.clearApplicationSchedulingProperties(); + if (this.schedulingProperties == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = schedulingProperties.keySet() + .iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public StringStringMapProto next() { + String key = iterator.next(); + return StringStringMapProto.newBuilder() + .setValue(schedulingProperties.get(key)).setKey(key).build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationSchedulingProperties(values); + } + + private void initApplicationSchedulingProperties() { + if (this.schedulingProperties != null) { + return; + } + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + List properties = p + .getApplicationSchedulingPropertiesList(); + this.schedulingProperties = new HashMap(properties.size()); + for (StringStringMapProto envProto : properties) { + this.schedulingProperties.put(envProto.getKey(), envProto.getValue()); + } + } + + @Override + public Map getApplicationSchedulingPropertiesMap() { + initApplicationSchedulingProperties(); + return this.schedulingProperties; + } + + @Override + public void setApplicationSchedulingPropertiesMap( + Map schedulingPropertyMap) { + if (schedulingPropertyMap == null) { + return; + } + initApplicationSchedulingProperties(); + this.schedulingProperties.clear(); + this.schedulingProperties.putAll(schedulingPropertyMap); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 39321ccd2dd..e286834cd4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -311,4 +311,10 @@ public interface RMApp extends EventHandler { * @return ApplicationPlacementContext */ ApplicationPlacementContext getApplicationPlacementContext(); + + /** + * Get the application scheduling environment variables. + * @return Map of envs related to application scheduling preferences. + */ + Map getApplicationSchedulingEnvs(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 38f666b4c7b..714f0e136cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -82,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; - import org.apache.hadoop.yarn.server.resourcemanager.placement .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -151,6 +150,7 @@ public class RMAppImpl implements RMApp, Recoverable { private final Map updatedNodes = new HashMap<>(); private final String applicationType; private final Set applicationTags; + private Map applicationSchedulingEnvs = new HashMap<>(); private final long attemptFailuresValidityInterval; private boolean amBlacklistingEnabled = false; @@ -489,6 +489,15 @@ public class RMAppImpl implements RMApp, Recoverable { this.placementContext = placementContext; + // If applications are not explicitly specifying envs, try to pull from + // AM container environment lists. + if(submissionContext.getAMContainerSpec() != null) { + applicationSchedulingEnvs + .putAll(submissionContext.getAMContainerSpec().getEnvironment()); + } + applicationSchedulingEnvs + .putAll(submissionContext.getApplicationSchedulingPropertiesMap()); + long localLogAggregationStatusTimeout = conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); @@ -2029,10 +2038,14 @@ public class RMAppImpl implements RMApp, Recoverable { /** * Clear Unused fields to free memory. - * @param app */ private void clearUnusedFields() { this.submissionContext.setAMContainerSpec(null); this.submissionContext.setLogAggregationContext(null); } + + @Override + public Map getApplicationSchedulingEnvs() { + return this.applicationSchedulingEnvs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index e47f0c1d614..8858d3b9d02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -45,10 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; @@ -90,10 +90,12 @@ public class AppSchedulingInfo { private final ReentrantReadWriteLock.WriteLock writeLock; public final ContainerUpdateContext updateContext; + public final Map applicationSchedulingEnvs = new HashMap<>(); - public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, AbstractUsersManager abstractUsersManager, - long epoch, ResourceUsage appResourceUsage) { + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, + Queue queue, AbstractUsersManager abstractUsersManager, long epoch, + ResourceUsage appResourceUsage, + Map applicationSchedulingEnvs) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; @@ -102,6 +104,7 @@ public class AppSchedulingInfo { this.containerIdCounter = new AtomicLong( epoch << ResourceManager.EPOCH_BIT_SHIFT); this.appResourceUsage = appResourceUsage; + this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); updateContext = new ContainerUpdateContext(this); @@ -211,24 +214,27 @@ public class AppSchedulingInfo { Map> dedupRequests) { boolean offswitchResourcesUpdated = false; for (Map.Entry> entry : - dedupRequests.entrySet()) { + dedupRequests.entrySet()) { SchedulerRequestKey schedulerRequestKey = entry.getKey(); - if (!schedulerKeyToAppPlacementAllocator.containsKey( - schedulerRequestKey)) { + if (!schedulerKeyToAppPlacementAllocator + .containsKey(schedulerRequestKey)) { + AppPlacementAllocator placementAllocatorInstance = ApplicationPlacementFactory + .getAppPlacementAllocator(applicationSchedulingEnvs + .get(ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS)); + placementAllocatorInstance.setAppSchedulingInfo(this); + schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, - new LocalityAppPlacementAllocator<>(this)); + placementAllocatorInstance); } // Update AppPlacementAllocator - PendingAskUpdateResult pendingAmountChanges = - schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey) - .updatePendingAsk(entry.getValue().values(), - recoverPreemptedRequestForAContainer); + PendingAskUpdateResult pendingAmountChanges = schedulerKeyToAppPlacementAllocator + .get(schedulerRequestKey).updatePendingAsk(entry.getValue().values(), + recoverPreemptedRequestForAContainer); if (null != pendingAmountChanges) { - updatePendingResources( - pendingAmountChanges, schedulerRequestKey, + updatePendingResources(pendingAmountChanges, schedulerRequestKey, queue.getMetrics()); offswitchResourcesUpdated = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java new file mode 100644 index 00000000000..40c8d054eda --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; + +/** + * Factory class to build various application placement policies. + */ +@Public +@Unstable +public class ApplicationPlacementFactory { + + /** + * Get AppPlacementAllocator related to the placement type requested. + * + * @param appPlacementAllocatorName + * allocator class name. + * @return Specific AppPlacementAllocator instance based on type + */ + public static AppPlacementAllocator getAppPlacementAllocator( + String appPlacementAllocatorName) { + Class policyClass; + try { + if (appPlacementAllocatorName == null) { + policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } else { + policyClass = Class.forName(appPlacementAllocatorName); + } + } catch (ClassNotFoundException e) { + policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } + + if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) { + policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS; + } + + @SuppressWarnings("unchecked") + AppPlacementAllocator placementAllocatorInstance = (AppPlacementAllocator) ReflectionUtils + .newInstance(policyClass, null); + return placementAllocatorInstance; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index dfb0e67fcd9..f02f113e9dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -197,6 +197,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { protected ReentrantReadWriteLock.ReadLock readLock; protected ReentrantReadWriteLock.WriteLock writeLock; + private Map applicationSchedulingEnvs = new HashMap<>(); + // Not confirmed allocation resource, will be used to avoid too many proposal // rejected because of duplicated allocation private AtomicLong unconfirmedAllocatedMem = new AtomicLong(); @@ -207,9 +209,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { RMContext rmContext) { Preconditions.checkNotNull(rmContext, "RMContext should not be null"); this.rmContext = rmContext; - this.appSchedulingInfo = - new AppSchedulingInfo(applicationAttemptId, user, queue, - abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage); this.queue = queue; this.pendingRelease = Collections.newSetFromMap( new ConcurrentHashMap()); @@ -227,8 +226,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { this.logAggregationContext = appSubmissionContext.getLogAggregationContext(); } + applicationSchedulingEnvs = rmApp.getApplicationSchedulingEnvs(); } + this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, + queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage, + applicationSchedulingEnvs); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -1434,4 +1437,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return diagnosticMessage; } } + + public Map getApplicationSchedulingEnvs() { + return this.applicationSchedulingEnvs; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java new file mode 100644 index 00000000000..1bd37431c15 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; +/** + * This class will keep all Scheduling env's names which will help in + * placement calculations. + */ +public class ApplicationSchedulingConfig { + @InterfaceAudience.Private + public static final String ENV_APPLICATION_PLACEMENT_TYPE_CLASS = + "APPLICATION_PLACEMENT_TYPE_CLASS"; + + @InterfaceAudience.Private + public static final Class + DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index dcb38aa0054..5c49450d7ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -157,4 +158,12 @@ public interface AppPlacementAllocator { * Print human-readable requests to LOG debug. */ void showRequests(); + + /** + * Set app scheduling info. + * + * @param appSchedulingInfo + * app info object. + */ + void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 766827ca300..be1c1cc8580 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -66,6 +66,12 @@ public class LocalityAppPlacementAllocator this.appSchedulingInfo = info; } + public LocalityAppPlacementAllocator() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + @Override @SuppressWarnings("unchecked") public Iterator getPreferredNodeIterator( @@ -419,4 +425,9 @@ public class LocalityAppPlacementAllocator writeLock.unlock(); } } + + @Override + public void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo) { + this.appSchedulingInfo = appSchedulingInfo; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 2aca3752013..72de27cf95a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -254,6 +254,11 @@ public abstract class MockAsm extends MockApps { public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Map getApplicationSchedulingEnvs() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 756759957e6..c399368593a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -338,4 +338,9 @@ public class MockRMApp implements RMApp { public CollectorInfo getCollectorInfo() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Map getApplicationSchedulingEnvs() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index bb29889a854..3692b29c8bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -21,10 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.TreeSet; +import java.util.*; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -47,8 +44,9 @@ public class TestAppSchedulingInfo { FSLeafQueue queue = mock(FSLeafQueue.class); doReturn("test").when(queue).getQueueName(); - AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo( - appAttemptId, "test", queue, null, 0, new ResourceUsage()); + AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId, + "test", queue, null, 0, new ResourceUsage(), + new HashMap()); appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList(), new ArrayList()); @@ -120,7 +118,7 @@ public class TestAppSchedulingInfo { doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); AppSchedulingInfo info = new AppSchedulingInfo( appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, - new ResourceUsage()); + new ResourceUsage(), new HashMap()); Assert.assertEquals(0, info.getSchedulerKeys().size()); Priority pri1 = Priority.newInstance(1);