From 25bc68d15eb005180ab366e3b9470294018bd2b9 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 11 Jan 2014 07:07:17 +0000 Subject: [PATCH] YARN-1041. Added the ApplicationMasterProtocol API for applications to use the ability in ResourceManager to optionally not kill containers when the ApplicationMaster exits. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1557318 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + .../RegisterApplicationMasterResponse.java | 33 +- .../src/main/proto/yarn_service_protos.proto | 1 + ...gisterApplicationMasterResponsePBImpl.java | 52 + .../ApplicationMasterService.java | 7 + .../scheduler/AbstractYarnScheduler.java | 64 + .../scheduler/capacity/CapacityScheduler.java | 13 +- .../scheduler/fair/FairScheduler.java | 14 +- .../scheduler/fair/FairScheduler.java.orig | 1361 +++++++++++++++++ .../scheduler/fifo/FifoScheduler.java | 13 +- .../applicationsmanager/TestAMRestart.java | 25 +- .../capacity/TestCapacityScheduler.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 5 +- .../scheduler/fifo/TestFifoScheduler.java | 4 +- .../fifo/TestFifoScheduler.java.orig | 615 ++++++++ .../webapp/TestRMWebServicesApps.java | 32 +- 16 files changed, 2196 insertions(+), 49 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 11d34ca32c9..c14001f7b11 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -63,6 +63,10 @@ Release 2.4.0 - UNRELEASED YARN-1033. Expose RM active/standby state to Web UI and REST API (kasha) + YARN-1041. Added the ApplicationMasterProtocol API for applications to use the + ability in ResourceManager to optionally not kill containers when the + ApplicationMaster exits. (Jian He via vinodkv) + IMPROVEMENTS YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 9c817b318ce..2755a9dd998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -27,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -47,16 +49,19 @@ import org.apache.hadoop.yarn.util.Records; @Public @Stable public abstract class RegisterApplicationMasterResponse { + @Private @Unstable public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, - Map acls, ByteBuffer key) { + Map acls, ByteBuffer key, + List containersFromPreviousAttempt) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); response.setApplicationACLs(acls); response.setClientToAMTokenMasterKey(key); + response.setContainersFromPreviousAttempt(containersFromPreviousAttempt); return response; } @@ -105,4 +110,30 @@ public abstract class RegisterApplicationMasterResponse { @Public @Stable public abstract void setClientToAMTokenMasterKey(ByteBuffer key); + + /** + *

+ * Get the list of running containers as viewed by + * ResourceManager from previous application attempt. + *

+ * + * @return the list of running containers as viewed by + * ResourceManager from previous application attempt + */ + @Public + @Unstable + public abstract List getContainersFromPreviousAttempt(); + + /** + * Set the list of running containers as viewed by + * ResourceManager from previous application attempt. + * + * @param containersFromPreviousAttempt + * the list of running containers as viewed by + * ResourceManager from previous application attempt. + */ + @Private + @Unstable + public abstract void setContainersFromPreviousAttempt( + List containersFromPreviousAttempt); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a4631d11b6e..dc97eecdc7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -44,6 +44,7 @@ message RegisterApplicationMasterResponseProto { optional ResourceProto maximumCapability = 1; optional bytes client_to_am_token_master_key = 2; repeated ApplicationACLMapProto application_ACLs = 3; + repeated ContainerProto containers_from_previous_attempt = 4; } message FinishApplicationMasterRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index 486304c7fb1..0e593d336c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -29,10 +30,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; @@ -52,6 +56,7 @@ public class RegisterApplicationMasterResponsePBImpl extends private Resource maximumResourceCapability; private Map applicationACLS = null; + private List containersFromPreviousAttempt = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -105,6 +110,9 @@ public class RegisterApplicationMasterResponsePBImpl extends if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.containersFromPreviousAttempt != null) { + addRunningContainersToProto(); + } } @@ -226,6 +234,43 @@ public class RegisterApplicationMasterResponsePBImpl extends ByteBuffer.wrap(builder.getClientToAmTokenMasterKey().toByteArray()); return key; } + + @Override + public List getContainersFromPreviousAttempt() { + if (this.containersFromPreviousAttempt != null) { + return this.containersFromPreviousAttempt; + } + initRunningContainersList(); + return this.containersFromPreviousAttempt; + } + + @Override + public void setContainersFromPreviousAttempt(final List containers) { + if (containers == null) { + return; + } + this.containersFromPreviousAttempt = new ArrayList(); + this.containersFromPreviousAttempt.addAll(containers); + } + + private void initRunningContainersList() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersFromPreviousAttemptList(); + containersFromPreviousAttempt = new ArrayList(); + for (ContainerProto c : list) { + containersFromPreviousAttempt.add(convertFromProtoFormat(c)); + } + } + + private void addRunningContainersToProto() { + maybeInitBuilder(); + builder.clearContainersFromPreviousAttempt(); + List list = new ArrayList(); + for (Container c : containersFromPreviousAttempt) { + list.add(convertToProtoFormat(c)); + } + builder.addAllContainersFromPreviousAttempt(list); + } private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); @@ -235,4 +280,11 @@ public class RegisterApplicationMasterResponsePBImpl extends return ((ResourcePBImpl)resource).getProto(); } + private ContainerPBImpl convertFromProtoFormat(ContainerProto p) { + return new ContainerPBImpl(p); + } + + private ContainerProto convertToProtoFormat(Container t) { + return ((ContainerPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 7a8b2a25069..761bdb14005 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionContainer; @@ -78,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -271,6 +273,11 @@ public class ApplicationMasterService extends AbstractService implements .getClientToAMTokenSecretManager() .getMasterKey(applicationAttemptId).getEncoded())); } + + List containerList = + ((AbstractYarnScheduler) rScheduler) + .getTransferredContainers(applicationAttemptId); + response.setContainersFromPreviousAttempt(containerList); return response; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java new file mode 100644 index 00000000000..e460f1cb5e5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +public class AbstractYarnScheduler { + + protected RMContext rmContext; + protected Map applications; + + public synchronized List getTransferredContainers( + ApplicationAttemptId currentAttempt) { + ApplicationId appId = currentAttempt.getApplicationId(); + SchedulerApplication app = applications.get(appId); + List containerList = new ArrayList(); + RMApp appImpl = this.rmContext.getRMApps().get(appId); + if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) { + return containerList; + } + Collection liveContainers = + app.getCurrentAppAttempt().getLiveContainers(); + ContainerId amContainerId = + rmContext.getRMApps().get(appId).getCurrentAppAttempt() + .getMasterContainer().getId(); + for (RMContainer rmContainer : liveContainers) { + if (!rmContainer.getContainerId().equals(amContainerId)) { + containerList.add(rmContainer.getContainer()); + } + } + return containerList; + } + + public Map getSchedulerApplications() { + return applications; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6035ee1bc7e..0197c5bf998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -95,7 +96,7 @@ import com.google.common.annotations.VisibleForTesting; @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") -public class CapacityScheduler +public class CapacityScheduler extends AbstractYarnScheduler implements PreemptableResourceScheduler, CapacitySchedulerContext, Configurable { @@ -177,7 +178,6 @@ public class CapacityScheduler private CapacitySchedulerConfiguration conf; private Configuration yarnConf; - private RMContext rmContext; private Map queues = new ConcurrentHashMap(); @@ -191,10 +191,6 @@ public class CapacityScheduler private Resource minimumAllocation; private Resource maximumAllocation; - @VisibleForTesting - protected Map applications = - new ConcurrentHashMap(); - private boolean initialized = false; private ResourceCalculator calculator; @@ -271,9 +267,10 @@ public class CapacityScheduler this.maximumAllocation = this.conf.getMaximumAllocation(); this.calculator = this.conf.getResourceCalculator(); this.usePortForNodeName = this.conf.getUsePortForNodeName(); - + this.applications = + new ConcurrentHashMap(); this.rmContext = rmContext; - + initializeQueues(this.conf); initialized = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 9fc43299681..3ff3b04e63a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -120,10 +121,10 @@ import com.google.common.annotations.VisibleForTesting; @LimitedPrivate("yarn") @Unstable @SuppressWarnings("unchecked") -public class FairScheduler implements ResourceScheduler { +public class FairScheduler extends AbstractYarnScheduler implements + ResourceScheduler { private boolean initialized; private FairSchedulerConfiguration conf; - private RMContext rmContext; private Resource minimumAllocation; private Resource maximumAllocation; private Resource incrAllocation; @@ -157,11 +158,6 @@ public class FairScheduler implements ResourceScheduler { // Time we last ran preemptTasksIfNecessary private long lastPreemptCheckTime; - // This stores per-application scheduling information, - @VisibleForTesting - protected Map applications = - new ConcurrentHashMap(); - // Nodes in the cluster, indexed by NodeId private Map nodes = new ConcurrentHashMap(); @@ -1235,6 +1231,9 @@ public class FairScheduler implements ResourceScheduler { rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); this.rmContext = rmContext; + // This stores per-application scheduling information + this.applications = + new ConcurrentHashMap(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); @@ -1357,5 +1356,4 @@ public class FairScheduler implements ResourceScheduler { queue.collectSchedulerApplications(apps); return apps; } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig new file mode 100644 index 00000000000..9fc43299681 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java.orig @@ -0,0 +1,1361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A scheduler that schedules resources between a set of queues. The scheduler + * keeps track of the resources used by each queue, and attempts to maintain + * fairness by scheduling tasks at queues whose allocations are farthest below + * an ideal fair distribution. + * + * The fair scheduler supports hierarchical queues. All queues descend from a + * queue named "root". Available resources are distributed among the children + * of the root queue in the typical fair scheduling fashion. Then, the children + * distribute the resources assigned to them to their children in the same + * fashion. Applications may only be scheduled on leaf queues. Queues can be + * specified as children of other queues by placing them as sub-elements of their + * parents in the fair scheduler configuration file. + * + * A queue's name starts with the names of its parents, with periods as + * separators. So a queue named "queue1" under the root named, would be + * referred to as "root.queue1", and a queue named "queue2" under a queue + * named "parent1" would be referred to as "root.parent1.queue2". + */ +@LimitedPrivate("yarn") +@Unstable +@SuppressWarnings("unchecked") +public class FairScheduler implements ResourceScheduler { + private boolean initialized; + private FairSchedulerConfiguration conf; + private RMContext rmContext; + private Resource minimumAllocation; + private Resource maximumAllocation; + private Resource incrAllocation; + private QueueManager queueMgr; + private Clock clock; + private boolean usePortForNodeName; + + private static final Log LOG = LogFactory.getLog(FairScheduler.class); + + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DefaultResourceCalculator(); + + // Value that container assignment methods return when a container is + // reserved + public static final Resource CONTAINER_RESERVED = Resources.createResource(-1); + + // How often fair shares are re-calculated (ms) + protected long UPDATE_INTERVAL = 500; + + private final static List EMPTY_CONTAINER_LIST = + new ArrayList(); + + private static final Allocation EMPTY_ALLOCATION = + new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); + + // Aggregate metrics + FSQueueMetrics rootMetrics; + + // Time when we last updated preemption vars + protected long lastPreemptionUpdateTime; + // Time we last ran preemptTasksIfNecessary + private long lastPreemptCheckTime; + + // This stores per-application scheduling information, + @VisibleForTesting + protected Map applications = + new ConcurrentHashMap(); + + // Nodes in the cluster, indexed by NodeId + private Map nodes = + new ConcurrentHashMap(); + + // Aggregate capacity of the cluster + private Resource clusterCapacity = + RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); + + // How often tasks are preempted + protected long preemptionInterval; + + // ms to wait before force killing stuff (must be longer than a couple + // of heartbeats to give task-kill commands a chance to act). + protected long waitTimeBeforeKill; + + // Containers whose AMs have been warned that they will be preempted soon. + private List warnedContainers = new ArrayList(); + + protected boolean preemptionEnabled; + protected boolean sizeBasedWeight; // Give larger weights to larger jobs + protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster + protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not + protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling + private Comparator nodeAvailableResourceComparator = + new NodeAvailableResourceComparator(); // Node available resource comparator + protected double nodeLocalityThreshold; // Cluster threshold for node locality + protected double rackLocalityThreshold; // Cluster threshold for rack locality + protected long nodeLocalityDelayMs; // Delay for node locality + protected long rackLocalityDelayMs; // Delay for rack locality + private FairSchedulerEventLog eventLog; // Machine-readable event log + protected boolean assignMultiple; // Allocate multiple containers per + // heartbeat + protected int maxAssign; // Max containers to assign per heartbeat + + @VisibleForTesting + final MaxRunningAppsEnforcer maxRunningEnforcer; + + private AllocationFileLoaderService allocsLoader; + @VisibleForTesting + AllocationConfiguration allocConf; + + public FairScheduler() { + clock = new SystemClock(); + allocsLoader = new AllocationFileLoaderService(); + queueMgr = new QueueManager(this); + maxRunningEnforcer = new MaxRunningAppsEnforcer(this); + } + + private void validateConf(Configuration conf) { + // validate scheduler memory allocation setting + int minMem = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + int maxMem = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + + if (minMem < 0 || minMem > maxMem) { + throw new YarnRuntimeException("Invalid resource scheduler memory" + + " allocation configuration" + + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + + "=" + minMem + + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + + "=" + maxMem + ", min should equal greater than 0" + + ", max should be no smaller than min."); + } + + // validate scheduler vcores allocation setting + int minVcores = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + int maxVcores = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + + if (minVcores < 0 || minVcores > maxVcores) { + throw new YarnRuntimeException("Invalid resource scheduler vcores" + + " allocation configuration" + + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES + + "=" + minVcores + + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + + "=" + maxVcores + ", min should equal greater than 0" + + ", max should be no smaller than min."); + } + } + + public FairSchedulerConfiguration getConf() { + return conf; + } + + public QueueManager getQueueManager() { + return queueMgr; + } + + @Override + public RMContainer getRMContainer(ContainerId containerId) { + FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId); + return (attempt == null) ? null : attempt.getRMContainer(containerId); + } + + private FSSchedulerApp getCurrentAttemptForContainer( + ContainerId containerId) { + SchedulerApplication app = + applications.get(containerId.getApplicationAttemptId() + .getApplicationId()); + if (app != null) { + return (FSSchedulerApp) app.getCurrentAppAttempt(); + } + return null; + } + + /** + * A runnable which calls {@link FairScheduler#update()} every + * UPDATE_INTERVAL milliseconds. + */ + private class UpdateThread implements Runnable { + public void run() { + while (true) { + try { + Thread.sleep(UPDATE_INTERVAL); + update(); + preemptTasksIfNecessary(); + } catch (Exception e) { + LOG.error("Exception in fair scheduler UpdateThread", e); + } + } + } + } + + /** + * Recompute the internal variables used by the scheduler - per-job weights, + * fair shares, deficits, minimum slot allocations, and amount of used and + * required resources per job. + */ + protected synchronized void update() { + updatePreemptionVariables(); // Determine if any queues merit preemption + + FSQueue rootQueue = queueMgr.getRootQueue(); + + // Recursively update demands for all queues + rootQueue.updateDemand(); + + rootQueue.setFairShare(clusterCapacity); + // Recursively compute fair shares for all queues + // and update metrics + rootQueue.recomputeShares(); + } + + /** + * Update the preemption fields for all QueueScheduables, i.e. the times since + * each queue last was at its guaranteed share and at > 1/2 of its fair share + * for each type of task. + */ + private void updatePreemptionVariables() { + long now = clock.getTime(); + lastPreemptionUpdateTime = now; + for (FSLeafQueue sched : queueMgr.getLeafQueues()) { + if (!isStarvedForMinShare(sched)) { + sched.setLastTimeAtMinShare(now); + } + if (!isStarvedForFairShare(sched)) { + sched.setLastTimeAtHalfFairShare(now); + } + } + } + + /** + * Is a queue below its min share for the given task type? + */ + boolean isStarvedForMinShare(FSLeafQueue sched) { + Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + sched.getMinShare(), sched.getDemand()); + return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), desiredShare); + } + + /** + * Is a queue being starved for fair share for the given task type? This is + * defined as being below half its fair share. + */ + boolean isStarvedForFairShare(FSLeafQueue sched) { + Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); + return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), desiredFairShare); + } + + /** + * Check for queues that need tasks preempted, either because they have been + * below their guaranteed share for minSharePreemptionTimeout or they have + * been below half their fair share for the fairSharePreemptionTimeout. If + * such queues exist, compute how many tasks of each type need to be preempted + * and then select the right ones using preemptTasks. + */ + protected synchronized void preemptTasksIfNecessary() { + if (!preemptionEnabled) { + return; + } + + long curTime = clock.getTime(); + if (curTime - lastPreemptCheckTime < preemptionInterval) { + return; + } + lastPreemptCheckTime = curTime; + + Resource resToPreempt = Resources.none(); + + for (FSLeafQueue sched : queueMgr.getLeafQueues()) { + resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); + } + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, + Resources.none())) { + preemptResources(queueMgr.getLeafQueues(), resToPreempt); + } + } + + /** + * Preempt a quantity of resources from a list of QueueSchedulables. The + * policy for this is to pick apps from queues that are over their fair share, + * but make sure that no queue is placed below its fair share in the process. + * We further prioritize preemption by choosing containers with lowest + * priority to preempt. + */ + protected void preemptResources(Collection scheds, + Resource toPreempt) { + if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { + return; + } + + Map apps = + new HashMap(); + Map queues = + new HashMap(); + + // Collect running containers from over-scheduled queues + List runningContainers = new ArrayList(); + for (FSLeafQueue sched : scheds) { + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), sched.getFairShare())) { + for (AppSchedulable as : sched.getRunnableAppSchedulables()) { + for (RMContainer c : as.getApp().getLiveContainers()) { + runningContainers.add(c); + apps.put(c, as.getApp()); + queues.put(c, sched); + } + } + } + } + + // Sort containers into reverse order of priority + Collections.sort(runningContainers, new Comparator() { + public int compare(RMContainer c1, RMContainer c2) { + int ret = c1.getContainer().getPriority().compareTo( + c2.getContainer().getPriority()); + if (ret == 0) { + return c2.getContainerId().compareTo(c1.getContainerId()); + } + return ret; + } + }); + + // Scan down the list of containers we've already warned and kill them + // if we need to. Remove any containers from the list that we don't need + // or that are no longer running. + Iterator warnedIter = warnedContainers.iterator(); + Set preemptedThisRound = new HashSet(); + while (warnedIter.hasNext()) { + RMContainer container = warnedIter.next(); + if (container.getState() == RMContainerState.RUNNING && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + warnOrKillContainer(container, apps.get(container), queues.get(container)); + preemptedThisRound.add(container); + Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + } else { + warnedIter.remove(); + } + } + + // Scan down the rest of the containers until we've preempted enough, making + // sure we don't preempt too many from any queue + Iterator runningIter = runningContainers.iterator(); + while (runningIter.hasNext() && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + toPreempt, Resources.none())) { + RMContainer container = runningIter.next(); + FSLeafQueue sched = queues.get(container); + if (!preemptedThisRound.contains(container) && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + sched.getResourceUsage(), sched.getFairShare())) { + warnOrKillContainer(container, apps.get(container), sched); + + warnedContainers.add(container); + Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + } + } + } + + private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, + FSLeafQueue queue) { + LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + + "res=" + container.getContainer().getResource() + + ") from queue " + queue.getName()); + + Long time = app.getContainerPreemptionTime(container); + + if (time != null) { + // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, + // proceed with kill + if (time + waitTimeBeforeKill < clock.getTime()) { + ContainerStatus status = + SchedulerUtils.createPreemptedContainerStatus( + container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + + // TODO: Not sure if this ever actually adds this to the list of cleanup + // containers on the RMNode (see SchedulerNode.releaseContainer()). + completedContainer(container, status, RMContainerEventType.KILL); + LOG.info("Killing container" + container + + " (after waiting for premption for " + + (clock.getTime() - time) + "ms)"); + } + } else { + // track the request in the FSSchedulerApp itself + app.addPreemption(container, clock.getTime()); + } + } + + /** + * Return the resource amount that this queue is allowed to preempt, if any. + * If the queue has been below its min share for at least its preemption + * timeout, it should preempt the difference between its current share and + * this min share. If it has been below half its fair share for at least the + * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its + * full fair share. If both conditions hold, we preempt the max of the two + * amounts (this shouldn't happen unless someone sets the timeouts to be + * identical for some reason). + */ + protected Resource resToPreempt(FSLeafQueue sched, long curTime) { + String queue = sched.getName(); + long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue); + long fairShareTimeout = allocConf.getFairSharePreemptionTimeout(); + Resource resDueToMinShare = Resources.none(); + Resource resDueToFairShare = Resources.none(); + if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + sched.getMinShare(), sched.getDemand()); + resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + } + if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + sched.getFairShare(), sched.getDemand()); + resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resources.none(), Resources.subtract(target, sched.getResourceUsage())); + } + Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToMinShare, resDueToFairShare); + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + resToPreempt, Resources.none())) { + String message = "Should preempt " + resToPreempt + " res for queue " + + sched.getName() + ": resDueToMinShare = " + resDueToMinShare + + ", resDueToFairShare = " + resDueToFairShare; + LOG.info(message); + } + return resToPreempt; + } + + public RMContainerTokenSecretManager getContainerTokenSecretManager() { + return rmContext.getContainerTokenSecretManager(); + } + + // synchronized for sizeBasedWeight + public synchronized ResourceWeights getAppWeight(AppSchedulable app) { + double weight = 1.0; + if (sizeBasedWeight) { + // Set weight based on current memory demand + weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2); + } + weight *= app.getPriority().getPriority(); + if (weightAdjuster != null) { + // Run weight through the user-supplied weightAdjuster + weight = weightAdjuster.adjustWeight(app, weight); + } + return new ResourceWeights((float)weight); + } + + @Override + public Resource getMinimumResourceCapability() { + return minimumAllocation; + } + + public Resource getIncrementResourceCapability() { + return incrAllocation; + } + + @Override + public Resource getMaximumResourceCapability() { + return maximumAllocation; + } + + public double getNodeLocalityThreshold() { + return nodeLocalityThreshold; + } + + public double getRackLocalityThreshold() { + return rackLocalityThreshold; + } + + public long getNodeLocalityDelayMs() { + return nodeLocalityDelayMs; + } + + public long getRackLocalityDelayMs() { + return rackLocalityDelayMs; + } + + public boolean isContinuousSchedulingEnabled() { + return continuousSchedulingEnabled; + } + + public synchronized int getContinuousSchedulingSleepMs() { + return continuousSchedulingSleepMs; + } + + public Resource getClusterCapacity() { + return clusterCapacity; + } + + public synchronized Clock getClock() { + return clock; + } + + protected synchronized void setClock(Clock clock) { + this.clock = clock; + } + + public FairSchedulerEventLog getEventLog() { + return eventLog; + } + + /** + * Add a new application to the scheduler, with a given id, queue name, and + * user. This will accept a new app even if the user or queue is above + * configured limits, but the app will not be marked as runnable. + */ + protected synchronized void addApplication(ApplicationId applicationId, + String queueName, String user) { + if (queueName == null || queueName.isEmpty()) { + String message = "Reject application " + applicationId + + " submitted by user " + user + " with an empty queue name."; + LOG.info(message); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return; + } + + RMApp rmApp = rmContext.getRMApps().get(applicationId); + FSLeafQueue queue = assignToQueue(rmApp, queueName, user); + if (queue == null) { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppRejectedEvent(applicationId, + "Application rejected by queue placement policy")); + return; + } + + // Enforce ACLs + UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); + + if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) + && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { + String msg = "User " + userUgi.getUserName() + + " cannot submit applications to queue " + queue.getName(); + LOG.info(msg); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, msg)); + return; + } + + SchedulerApplication application = + new SchedulerApplication(queue, user); + applications.put(applicationId, application); + queue.getMetrics().submitApp(user); + + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queueName + ", currently num of applications: " + + applications.size()); + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } + + /** + * Add a new application attempt to the scheduler. + */ + protected synchronized void addApplicationAttempt( + ApplicationAttemptId applicationAttemptId, + boolean transferStateFromPreviousAttempt) { + SchedulerApplication application = + applications.get(applicationAttemptId.getApplicationId()); + String user = application.getUser(); + FSLeafQueue queue = (FSLeafQueue) application.getQueue(); + + FSSchedulerApp attempt = + new FSSchedulerApp(applicationAttemptId, user, + queue, new ActiveUsersManager(getRootQueueMetrics()), + rmContext); + if (transferStateFromPreviousAttempt) { + attempt.transferStateFromPreviousAttempt(application + .getCurrentAppAttempt()); + } + application.setCurrentAppAttempt(attempt); + + boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); + queue.addApp(attempt, runnable); + if (runnable) { + maxRunningEnforcer.trackRunnableApp(attempt); + } else { + maxRunningEnforcer.trackNonRunnableApp(attempt); + } + + queue.getMetrics().submitAppAttempt(user); + + LOG.info("Added Application Attempt " + applicationAttemptId + + " to scheduler from user: " + user); + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); + } + + @VisibleForTesting + FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { + FSLeafQueue queue = null; + try { + QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy(); + queueName = placementPolicy.assignAppToQueue(queueName, user); + if (queueName == null) { + return null; + } + queue = queueMgr.getLeafQueue(queueName, true); + } catch (IOException ex) { + LOG.error("Error assigning app to queue, rejecting", ex); + } + + if (rmApp != null) { + rmApp.setQueue(queue.getName()); + } else { + LOG.warn("Couldn't find RM app to set queue name on"); + } + + return queue; + } + + private synchronized void removeApplication(ApplicationId applicationId, + RMAppState finalState) { + SchedulerApplication application = applications.get(applicationId); + if (application == null){ + LOG.warn("Couldn't find application " + applicationId); + return; + } + application.stop(finalState); + applications.remove(applicationId); + } + + private synchronized void removeApplicationAttempt( + ApplicationAttemptId applicationAttemptId, + RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { + LOG.info("Application " + applicationAttemptId + " is done." + + " finalState=" + rmAppAttemptFinalState); + SchedulerApplication application = + applications.get(applicationAttemptId.getApplicationId()); + FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); + + if (attempt == null || application == null) { + LOG.info("Unknown application " + applicationAttemptId + " has completed!"); + return; + } + + // Release all the running containers + for (RMContainer rmContainer : attempt.getLiveContainers()) { + if (keepContainers + && rmContainer.getState().equals(RMContainerState.RUNNING)) { + // do not kill the running container in the case of work-preserving AM + // restart. + LOG.info("Skip killing " + rmContainer.getContainerId()); + continue; + } + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + rmContainer.getContainerId(), + SchedulerUtils.COMPLETED_APPLICATION), + RMContainerEventType.KILL); + } + + // Release all reserved containers + for (RMContainer rmContainer : attempt.getReservedContainers()) { + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + rmContainer.getContainerId(), + "Application Complete"), + RMContainerEventType.KILL); + } + // Clean up pending requests, metrics etc. + attempt.stop(rmAppAttemptFinalState); + + // Inform the queue + FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue() + .getQueueName(), false); + boolean wasRunnable = queue.removeApp(attempt); + + if (wasRunnable) { + maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt); + } else { + maxRunningEnforcer.untrackNonRunnableApp(attempt); + } + } + + /** + * Clean up a completed container. + */ + private synchronized void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer == null) { + LOG.info("Null container completed..."); + return; + } + + Container container = rmContainer.getContainer(); + + // Get the application for the finished container + FSSchedulerApp application = + getCurrentAttemptForContainer(container.getId()); + ApplicationId appId = + container.getId().getApplicationAttemptId().getApplicationId(); + if (application == null) { + LOG.info("Container " + container + " of" + + " unknown application attempt " + appId + + " completed with event " + event); + return; + } + + // Get the node on which the container was allocated + FSSchedulerNode node = nodes.get(container.getNodeId()); + + if (rmContainer.getState() == RMContainerState.RESERVED) { + application.unreserve(node, rmContainer.getReservedPriority()); + node.unreserveResource(application); + } else { + application.containerCompleted(rmContainer, containerStatus, event); + node.releaseContainer(container); + updateRootQueueMetrics(); + } + + LOG.info("Application attempt " + application.getApplicationAttemptId() + + " released container " + container.getId() + " on node: " + node + + " with event: " + event); + } + + private synchronized void addNode(RMNode node) { + nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); + Resources.addTo(clusterCapacity, node.getTotalCapability()); + updateRootQueueMetrics(); + + LOG.info("Added node " + node.getNodeAddress() + + " cluster capacity: " + clusterCapacity); + } + + private synchronized void removeNode(RMNode rmNode) { + FSSchedulerNode node = nodes.get(rmNode.getNodeID()); + // This can occur when an UNHEALTHY node reconnects + if (node == null) { + return; + } + Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); + updateRootQueueMetrics(); + + // Remove running containers + List runningContainers = node.getRunningContainers(); + for (RMContainer container : runningContainers) { + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), + RMContainerEventType.KILL); + } + + // Remove reservations, if any + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + completedContainer(reservedContainer, + SchedulerUtils.createAbnormalContainerStatus( + reservedContainer.getContainerId(), + SchedulerUtils.LOST_CONTAINER), + RMContainerEventType.KILL); + } + + nodes.remove(rmNode.getNodeID()); + LOG.info("Removed node " + rmNode.getNodeAddress() + + " cluster capacity: " + clusterCapacity); + } + + @Override + public Allocation allocate(ApplicationAttemptId appAttemptId, + List ask, List release, List blacklistAdditions, List blacklistRemovals) { + + // Make sure this application exists + FSSchedulerApp application = getSchedulerApp(appAttemptId); + if (application == null) { + LOG.info("Calling allocate on removed " + + "or non existant application " + appAttemptId); + return EMPTY_ALLOCATION; + } + + // Sanity check + SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), + clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation); + + // Release containers + for (ContainerId releasedContainerId : release) { + RMContainer rmContainer = getRMContainer(releasedContainerId); + if (rmContainer == null) { + RMAuditLogger.logFailure(application.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "FairScheduler", + "Trying to release container not owned by app or with invalid id", + application.getApplicationId(), releasedContainerId); + } + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + releasedContainerId, + SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED); + } + + synchronized (application) { + if (!ask.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: pre-update" + + " applicationAttemptId=" + appAttemptId + + " application=" + application.getApplicationId()); + } + application.showRequests(); + + // Update application requests + application.updateResourceRequests(ask); + + LOG.debug("allocate: post-update"); + application.showRequests(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate:" + + " applicationAttemptId=" + appAttemptId + + " #ask=" + ask.size()); + + LOG.debug("Preempting " + application.getPreemptionContainers().size() + + " container(s)"); + } + + Set preemptionContainerIds = new HashSet(); + for (RMContainer container : application.getPreemptionContainers()) { + preemptionContainerIds.add(container.getContainerId()); + } + + application.updateBlacklist(blacklistAdditions, blacklistRemovals); + + return new Allocation(application.pullNewlyAllocatedContainers(), + application.getHeadroom(), preemptionContainerIds); + } + } + + /** + * Process a container which has launched on a node, as reported by the node. + */ + private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { + // Get the application for the finished container + FSSchedulerApp application = getCurrentAttemptForContainer(containerId); + if (application == null) { + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " launched container " + containerId + " on node: " + node); + return; + } + + application.containerLaunchedOnNode(containerId, node.getNodeID()); + } + + /** + * Process a heartbeat update from a node. + */ + private synchronized void nodeUpdate(RMNode nm) { + if (LOG.isDebugEnabled()) { + LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); + } + eventLog.log("HEARTBEAT", nm.getHostName()); + FSSchedulerNode node = nodes.get(nm.getNodeID()); + + // Update resource if any change + SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); + + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = new ArrayList(); + List completedContainers = new ArrayList(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Process completed containers + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.debug("Container FINISHED: " + containerId); + completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); + } + + if (continuousSchedulingEnabled) { + if (!completedContainers.isEmpty()) { + attemptScheduling(node); + } + } else { + attemptScheduling(node); + } + } + + private void continuousScheduling() { + while (true) { + List nodeIdList = new ArrayList(nodes.keySet()); + Collections.sort(nodeIdList, nodeAvailableResourceComparator); + + // iterate all nodes + for (NodeId nodeId : nodeIdList) { + if (nodes.containsKey(nodeId)) { + FSSchedulerNode node = nodes.get(nodeId); + try { + if (Resources.fitsIn(minimumAllocation, + node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.warn("Error while attempting scheduling for node " + node + + ": " + ex.toString(), ex); + } + } + } + try { + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.warn("Error while doing sleep in continuous scheduling: " + + e.toString(), e); + } + } + } + + /** Sort nodes by available resource */ + private class NodeAvailableResourceComparator implements Comparator { + + @Override + public int compare(NodeId n1, NodeId n2) { + return RESOURCE_CALCULATOR.compare(clusterCapacity, + nodes.get(n2).getAvailableResource(), + nodes.get(n1).getAvailableResource()); + } + } + + private synchronized void attemptScheduling(FSSchedulerNode node) { + // Assign new containers... + // 1. Check for reserved applications + // 2. Schedule if there are no reservations + + AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); + if (reservedAppSchedulable != null) { + Priority reservedPriority = node.getReservedContainer().getReservedPriority(); + if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { + // Don't hold the reservation if app can no longer use it + LOG.info("Releasing reservation that cannot be satisfied for application " + + reservedAppSchedulable.getApp().getApplicationAttemptId() + + " on node " + node); + reservedAppSchedulable.unreserve(reservedPriority, node); + reservedAppSchedulable = null; + } else { + // Reservation exists; try to fulfill the reservation + LOG.info("Trying to fulfill reservation for application " + + reservedAppSchedulable.getApp().getApplicationAttemptId() + + " on node: " + node); + + node.getReservedAppSchedulable().assignReservedContainer(node); + } + } + if (reservedAppSchedulable == null) { + // No reservation, schedule at queue which is farthest below fair share + int assignedContainers = 0; + while (node.getReservedContainer() == null) { + boolean assignedContainer = false; + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + queueMgr.getRootQueue().assignContainer(node), + Resources.none())) { + assignedContainers++; + assignedContainer = true; + } + if (!assignedContainer) { break; } + if (!assignMultiple) { break; } + if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; } + } + } + updateRootQueueMetrics(); + } + + @Override + public SchedulerNodeReport getNodeReport(NodeId nodeId) { + FSSchedulerNode node = nodes.get(nodeId); + return node == null ? null : new SchedulerNodeReport(node); + } + + public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { + SchedulerApplication app = + applications.get(appAttemptId.getApplicationId()); + if (app != null) { + return (FSSchedulerApp) app.getCurrentAppAttempt(); + } + return null; + } + + @Override + public SchedulerAppReport getSchedulerAppInfo( + ApplicationAttemptId appAttemptId) { + FSSchedulerApp attempt = getSchedulerApp(appAttemptId); + if (attempt == null) { + LOG.error("Request for appInfo of unknown attempt" + appAttemptId); + return null; + } + return new SchedulerAppReport(attempt); + } + + @Override + public ApplicationResourceUsageReport getAppResourceUsageReport( + ApplicationAttemptId appAttemptId) { + FSSchedulerApp attempt = getSchedulerApp(appAttemptId); + if (attempt == null) { + LOG.error("Request for appInfo of unknown attempt" + appAttemptId); + return null; + } + return attempt.getResourceUsageReport(); + } + + /** + * Subqueue metrics might be a little out of date because fair shares are + * recalculated at the update interval, but the root queue metrics needs to + * be updated synchronously with allocations and completions so that cluster + * metrics will be consistent. + */ + private void updateRootQueueMetrics() { + rootMetrics.setAvailableResourcesToQueue( + Resources.subtract( + clusterCapacity, rootMetrics.getAllocatedResources())); + } + + @Override + public QueueMetrics getRootQueueMetrics() { + return rootMetrics; + } + + @Override + public void handle(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; + addNode(nodeAddedEvent.getAddedRMNode()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; + removeNode(nodeRemovedEvent.getRemovedRMNode()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; + nodeUpdate(nodeUpdatedEvent.getRMNode()); + break; + case APP_ADDED: + if (!(event instanceof AppAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; + addApplication(appAddedEvent.getApplicationId(), + appAddedEvent.getQueue(), appAddedEvent.getUser()); + break; + case APP_REMOVED: + if (!(event instanceof AppRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; + removeApplication(appRemovedEvent.getApplicationID(), + appRemovedEvent.getFinalState()); + break; + case APP_ATTEMPT_ADDED: + if (!(event instanceof AppAttemptAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + AppAttemptAddedSchedulerEvent appAttemptAddedEvent = + (AppAttemptAddedSchedulerEvent) event; + addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), + appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + break; + case APP_ATTEMPT_REMOVED: + if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = + (AppAttemptRemovedSchedulerEvent) event; + removeApplicationAttempt( + appAttemptRemovedEvent.getApplicationAttemptID(), + appAttemptRemovedEvent.getFinalAttemptState(), + appAttemptRemovedEvent.getKeepContainersAcrossAppAttempts()); + break; + case CONTAINER_EXPIRED: + if (!(event instanceof ContainerExpiredSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + ContainerExpiredSchedulerEvent containerExpiredEvent = + (ContainerExpiredSchedulerEvent)event; + ContainerId containerId = containerExpiredEvent.getContainerId(); + completedContainer(getRMContainer(containerId), + SchedulerUtils.createAbnormalContainerStatus( + containerId, + SchedulerUtils.EXPIRED_CONTAINER), + RMContainerEventType.EXPIRE); + break; + default: + LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); + } + } + + @Override + public void recover(RMState state) throws Exception { + // NOT IMPLEMENTED + } + + @Override + public synchronized void reinitialize(Configuration conf, RMContext rmContext) + throws IOException { + if (!initialized) { + this.conf = new FairSchedulerConfiguration(conf); + validateConf(this.conf); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); + nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); + rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); + preemptionEnabled = this.conf.getPreemptionEnabled(); + assignMultiple = this.conf.getAssignMultiple(); + maxAssign = this.conf.getMaxAssign(); + sizeBasedWeight = this.conf.getSizeBasedWeight(); + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + usePortForNodeName = this.conf.getUsePortForNodeName(); + + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + this.rmContext = rmContext; + this.eventLog = new FairSchedulerEventLog(); + eventLog.init(this.conf); + + initialized = true; + + allocConf = new AllocationConfiguration(conf); + try { + queueMgr.initialize(conf); + } catch (Exception e) { + throw new IOException("Failed to start FairScheduler", e); + } + + Thread updateThread = new Thread(new UpdateThread()); + updateThread.setName("FairSchedulerUpdateThread"); + updateThread.setDaemon(true); + updateThread.start(); + + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + Thread schedulingThread = new Thread( + new Runnable() { + @Override + public void run() { + continuousScheduling(); + } + } + ); + schedulingThread.setName("ContinuousScheduling"); + schedulingThread.setDaemon(true); + schedulingThread.start(); + } + + allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationReloadListener()); + // If we fail to load allocations file on initialize, we want to fail + // immediately. After a successful load, exceptions on future reloads + // will just result in leaving things as they are. + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + throw new IOException("Failed to initialize FairScheduler", e); + } + allocsLoader.start(); + } else { + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + LOG.error("Failed to reload allocations file", e); + } + } + } + + @Override + public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, + boolean recursive) throws IOException { + if (!queueMgr.exists(queueName)) { + throw new IOException("queue " + queueName + " does not exist"); + } + return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues, + recursive); + } + + @Override + public List getQueueUserAclInfo() { + UserGroupInformation user = null; + try { + user = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + return new ArrayList(); + } + + return queueMgr.getRootQueue().getQueueUserAclInfo(user); + } + + @Override + public int getNumClusterNodes() { + return nodes.size(); + } + + @Override + public synchronized boolean checkAccess(UserGroupInformation callerUGI, + QueueACL acl, String queueName) { + FSQueue queue = getQueueManager().getQueue(queueName); + if (queue == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("ACL not found for queue access-type " + acl + + " for queue " + queueName); + } + return false; + } + return queue.hasAccess(acl, callerUGI); + } + + public AllocationConfiguration getAllocationConfiguration() { + return allocConf; + } + + private class AllocationReloadListener implements + AllocationFileLoaderService.Listener { + + @Override + public void onReload(AllocationConfiguration queueInfo) { + // Commit the reload; also create any queue defined in the alloc file + // if it does not already exist, so it can be displayed on the web UI. + synchronized (FairScheduler.this) { + allocConf = queueInfo; + allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity); + queueMgr.updateAllocationConfiguration(allocConf); + } + } + } + + @Override + public List getAppsInQueue(String queueName) { + FSQueue queue = queueMgr.getQueue(queueName); + if (queue == null) { + return null; + } + List apps = new ArrayList(); + queue.collectSchedulerApplications(apps); + return apps; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index d88d1e26a13..696a64c3c02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -104,7 +105,8 @@ import com.google.common.annotations.VisibleForTesting; @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") -public class FifoScheduler implements ResourceScheduler, Configurable { +public class FifoScheduler extends AbstractYarnScheduler implements + ResourceScheduler, Configurable { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); @@ -115,7 +117,6 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private final static Container[] EMPTY_CONTAINER_ARRAY = new Container[] {}; private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); - private RMContext rmContext; protected Map nodes = new ConcurrentHashMap(); @@ -124,11 +125,6 @@ public class FifoScheduler implements ResourceScheduler, Configurable { private Resource maximumAllocation; private boolean usePortForNodeName; - // Use ConcurrentSkipListMap because applications need to be ordered - @VisibleForTesting - protected Map applications = - new ConcurrentSkipListMap(); - private ActiveUsersManager activeUsersManager; private static final String DEFAULT_QUEUE_NAME = "default"; @@ -243,6 +239,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable { if (!this.initialized) { validateConf(conf); this.rmContext = rmContext; + //Use ConcurrentSkipListMap because applications need to be ordered + this.applications = + new ConcurrentSkipListMap(); this.minimumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 4d38e7c3ca1..f8329d68a75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -24,6 +24,7 @@ import java.util.List; import junit.framework.Assert; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -150,7 +151,29 @@ public class TestAMRestart { ApplicationAttemptId newAttemptId = app1.getCurrentAppAttempt().getAppAttemptId(); Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId())); - MockAM am2 = MockRM.launchAM(app1, rm1, nm1); + + // launch the new AM + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + nm1.nodeHeartbeat(true); + MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId()); + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + + // Assert two containers are running: container2 and container3; + Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempt() + .size()); + boolean containerId2Exists = false, containerId3Exists = false; + for (Container container : registerResponse + .getContainersFromPreviousAttempt()) { + if (container.getId().equals(containerId2)) { + containerId2Exists = true; + } + if (container.getId().equals(containerId3)) { + containerId3Exists = true; + } + } + Assert.assertTrue(containerId2Exists && containerId3Exists); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); // complete container by sending the container complete event which has earlier // attempt's attemptId diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index fbd7ec72a6d..08efe29453e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -642,7 +642,7 @@ public class TestCapacityScheduler { SchedulerApplication app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - cs.applications, cs, "a1"); + cs.getSchedulerApplications(), cs, "a1"); Assert.assertEquals("a1", app.getQueue().getQueueName()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 005cf5574db..b251ce7dd19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; @@ -260,7 +259,7 @@ public class TestFairScheduler { scheduler.addApplication(id.getApplicationId(), queueId, userId); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. - if (scheduler.applications.containsKey(id.getApplicationId())) { + if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { scheduler.addApplicationAttempt(id, false); } List ask = new ArrayList(); @@ -2546,6 +2545,6 @@ public class TestFairScheduler { FairScheduler scheduler = (FairScheduler) resourceManager.getResourceScheduler(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - scheduler.applications, scheduler, "default"); + scheduler.getSchedulerApplications(), scheduler, "default"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 2dc0e8805e7..af819d1787e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -591,8 +591,8 @@ public class TestFifoScheduler { ResourceScheduler.class); MockRM rm = new MockRM(conf); FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); - TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(fs.applications, - fs, "queue"); + TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( + fs.getSchedulerApplications(), fs, "queue"); } private void checkApplicationResourceUsage(int expected, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig new file mode 100644 index 00000000000..2dc0e8805e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java.orig @@ -0,0 +1,615 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFifoScheduler { + private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); + private final int GB = 1024; + + private ResourceManager resourceManager = null; + + private static final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + @Before + public void setUp() throws Exception { + resourceManager = new ResourceManager(); + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, + FifoScheduler.class, ResourceScheduler.class); + resourceManager.init(conf); + } + + @After + public void tearDown() throws Exception { + resourceManager.stop(); + } + + private org.apache.hadoop.yarn.server.resourcemanager.NodeManager + registerNode(String hostName, int containerManagerPort, int nmHttpPort, + String rackName, Resource capability) throws IOException, + YarnException { + return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( + hostName, containerManagerPort, nmHttpPort, rackName, capability, + resourceManager); + } + + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { + ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); + ApplicationAttemptId attId = + ApplicationAttemptId.newInstance(appIdImpl, attemptId); + return attId; + } + + private ResourceRequest createResourceRequest(int memory, String host, + int priority, int numContainers) { + ResourceRequest request = recordFactory + .newRecordInstance(ResourceRequest.class); + request.setCapability(Resources.createResource(memory)); + request.setResourceName(host); + request.setNumContainers(numContainers); + Priority prio = recordFactory.newRecordInstance(Priority.class); + prio.setPriority(priority); + request.setPriority(prio); + return request; + } + + @Test(timeout=5000) + public void testFifoSchedulerCapacityWhenNoNMs() { + FifoScheduler scheduler = new FifoScheduler(); + QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); + Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity()); + } + + @Test(timeout=5000) + public void testAppAttemptMetrics() throws Exception { + AsyncDispatcher dispatcher = new InlineDispatcher(); + RMContext rmContext = new RMContextImpl(dispatcher, null, + null, null, null, null, null, null, null); + + FifoScheduler schedular = new FifoScheduler(); + schedular.reinitialize(new Configuration(), rmContext); + QueueMetrics metrics = schedular.getRootQueueMetrics(); + int beforeAppsSubmitted = metrics.getAppsSubmitted(); + + ApplicationId appId = BuilderUtils.newApplicationId(200, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + + SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user"); + schedular.handle(appEvent); + SchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + schedular.handle(attemptEvent); + + appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2); + SchedulerEvent attemptEvent2 = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + schedular.handle(attemptEvent2); + + int afterAppsSubmitted = metrics.getAppsSubmitted(); + Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted); + } + + @Test(timeout=2000) + public void testNodeLocalAssignment() throws Exception { + AsyncDispatcher dispatcher = new InlineDispatcher(); + Configuration conf = new Configuration(); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.rollMasterKey(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.rollMasterKey(); + RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, + null, containerTokenSecretManager, nmTokenSecretManager, null); + + FifoScheduler scheduler = new FifoScheduler(); + scheduler.reinitialize(new Configuration(), rmContext); + + RMNode node0 = MockNodes.newNodeInfo(1, + Resources.createResource(1024 * 64), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); + scheduler.handle(nodeEvent1); + + int _appId = 1; + int _appAttemptId = 1; + ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, + _appAttemptId); + AppAddedSchedulerEvent appEvent = + new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1", + "user1"); + scheduler.handle(appEvent); + AppAttemptAddedSchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + scheduler.handle(attemptEvent); + + int memory = 64; + int nConts = 3; + int priority = 20; + + List ask = new ArrayList(); + ResourceRequest nodeLocal = createResourceRequest(memory, + node0.getHostName(), priority, nConts); + ResourceRequest rackLocal = createResourceRequest(memory, + node0.getRackName(), priority, nConts); + ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority, + nConts); + ask.add(nodeLocal); + ask.add(rackLocal); + ask.add(any); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); + + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); + + // Before the node update event, there are 3 local requests outstanding + Assert.assertEquals(3, nodeLocal.getNumContainers()); + + scheduler.handle(node0Update); + + // After the node update event, check that there are no more local requests + // outstanding + Assert.assertEquals(0, nodeLocal.getNumContainers()); + //Also check that the containers were scheduled + SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId); + Assert.assertEquals(3, info.getLiveContainers().size()); + } + + @Test(timeout=2000) + public void testUpdateResourceOnNode() throws Exception { + AsyncDispatcher dispatcher = new InlineDispatcher(); + Configuration conf = new Configuration(); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.rollMasterKey(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.rollMasterKey(); + RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, + null, containerTokenSecretManager, nmTokenSecretManager, null); + + FifoScheduler scheduler = new FifoScheduler(){ + @SuppressWarnings("unused") + public Map getNodes(){ + return nodes; + } + }; + scheduler.reinitialize(new Configuration(), rmContext); + RMNode node0 = MockNodes.newNodeInfo(1, + Resources.createResource(2048, 4), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0); + scheduler.handle(nodeEvent1); + + Method method = scheduler.getClass().getDeclaredMethod("getNodes"); + @SuppressWarnings("unchecked") + Map schedulerNodes = + (Map) method.invoke(scheduler); + assertEquals(schedulerNodes.values().size(), 1); + + // set resource of RMNode to 1024 and verify it works. + node0.setResourceOption(ResourceOption.newInstance( + Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); + assertEquals(node0.getTotalCapability().getMemory(), 1024); + // verify that SchedulerNode's resource hasn't been changed. + assertEquals(schedulerNodes.get(node0.getNodeID()). + getAvailableResource().getMemory(), 2048); + // now, NM heartbeat comes. + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); + scheduler.handle(node0Update); + // SchedulerNode's available resource is changed. + assertEquals(schedulerNodes.get(node0.getNodeID()). + getAvailableResource().getMemory(), 1024); + QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); + Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity()); + + int _appId = 1; + int _appAttemptId = 1; + ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, + _appAttemptId); + AppAddedSchedulerEvent appEvent = + new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1", + "user1"); + scheduler.handle(appEvent); + AppAttemptAddedSchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + scheduler.handle(attemptEvent); + + int memory = 1024; + int priority = 1; + + List ask = new ArrayList(); + ResourceRequest nodeLocal = createResourceRequest(memory, + node0.getHostName(), priority, 1); + ResourceRequest rackLocal = createResourceRequest(memory, + node0.getRackName(), priority, 1); + ResourceRequest any = createResourceRequest(memory, ResourceRequest.ANY, priority, + 1); + ask.add(nodeLocal); + ask.add(rackLocal); + ask.add(any); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); + + // Before the node update event, there are one local request + Assert.assertEquals(1, nodeLocal.getNumContainers()); + + // Now schedule. + scheduler.handle(node0Update); + + // After the node update event, check no local request + Assert.assertEquals(0, nodeLocal.getNumContainers()); + // Also check that one container was scheduled + SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId); + Assert.assertEquals(1, info.getLiveContainers().size()); + // And check the default Queue now is full. + queueInfo = scheduler.getQueueInfo(null, false, false); + Assert.assertEquals(1.0f, queueInfo.getCurrentCapacity()); + } + +// @Test + public void testFifoScheduler() throws Exception { + + LOG.info("--- START: testFifoScheduler ---"); + + final int GB = 1024; + + // Register node1 + String host_0 = "host_0"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + nm_0.heartbeat(); + + // Register node2 + String host_1 = "host_1"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(2 * GB, 1)); + nm_1.heartbeat(); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.create(1); + + // Submit an application + Application application_0 = new Application("user_0", resourceManager); + application_0.submit(); + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(GB); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = new Task(application_0, priority_1, + new String[] {host_0, host_1}); + application_0.addTask(task_0_0); + + // Submit another application + Application application_1 = new Application("user_1", resourceManager); + application_1.submit(); + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(3 * GB); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(4 * GB); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = new Task(application_1, priority_1, + new String[] {host_0, host_1}); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + LOG.info("Send resource requests to the scheduler"); + application_0.schedule(); + application_1.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + LOG.info("Send a heartbeat to kick the tires on the Scheduler... " + + "nm0 -> task_0_0 and task_1_0 allocated, used=4G " + + "nm1 -> nothing allocated"); + nm_0.heartbeat(); // task_0_0 and task_1_0 allocated, used=4G + nm_1.heartbeat(); // nothing allocated + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(3 * GB, application_1); + + nm_0.heartbeat(); + nm_1.heartbeat(); + + checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) + checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available + + LOG.info("Adding new tasks..."); + + Task task_1_1 = new Task(application_1, priority_1, + new String[] {ResourceRequest.ANY}); + application_1.addTask(task_1_1); + + Task task_1_2 = new Task(application_1, priority_1, + new String[] {ResourceRequest.ANY}); + application_1.addTask(task_1_2); + + Task task_1_3 = new Task(application_1, priority_0, + new String[] {ResourceRequest.ANY}); + application_1.addTask(task_1_3); + + application_1.schedule(); + + Task task_0_1 = new Task(application_0, priority_1, + new String[] {host_0, host_1}); + application_0.addTask(task_0_1); + + Task task_0_2 = new Task(application_0, priority_1, + new String[] {host_0, host_1}); + application_0.addTask(task_0_2); + + Task task_0_3 = new Task(application_0, priority_0, + new String[] {ResourceRequest.ANY}); + application_0.addTask(task_0_3); + + application_0.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + LOG.info("Sending hb from " + nm_0.getHostName()); + nm_0.heartbeat(); // nothing new, used=4G + + LOG.info("Sending hb from " + nm_1.getHostName()); + nm_1.heartbeat(); // task_0_3, used=2G + + // Get allocations from the scheduler + LOG.info("Trying to allocate..."); + application_0.schedule(); + checkApplicationResourceUsage(3 * GB, application_0); + application_1.schedule(); + checkApplicationResourceUsage(3 * GB, application_1); + nm_0.heartbeat(); + nm_1.heartbeat(); + checkNodeResourceUsage(4*GB, nm_0); + checkNodeResourceUsage(2*GB, nm_1); + + // Complete tasks + LOG.info("Finishing up task_0_0"); + application_0.finishTask(task_0_0); // Now task_0_1 + application_0.schedule(); + application_1.schedule(); + nm_0.heartbeat(); + nm_1.heartbeat(); + checkApplicationResourceUsage(3 * GB, application_0); + checkApplicationResourceUsage(3 * GB, application_1); + checkNodeResourceUsage(4*GB, nm_0); + checkNodeResourceUsage(2*GB, nm_1); + + LOG.info("Finishing up task_1_0"); + application_1.finishTask(task_1_0); // Now task_0_2 + application_0.schedule(); // final overcommit for app0 caused here + application_1.schedule(); + nm_0.heartbeat(); // final overcommit for app0 occurs here + nm_1.heartbeat(); + checkApplicationResourceUsage(4 * GB, application_0); + checkApplicationResourceUsage(0 * GB, application_1); + //checkNodeResourceUsage(1*GB, nm_0); // final over-commit -> rm.node->1G, test.node=2G + checkNodeResourceUsage(2*GB, nm_1); + + LOG.info("Finishing up task_0_3"); + application_0.finishTask(task_0_3); // No more + application_0.schedule(); + application_1.schedule(); + nm_0.heartbeat(); + nm_1.heartbeat(); + checkApplicationResourceUsage(2 * GB, application_0); + checkApplicationResourceUsage(0 * GB, application_1); + //checkNodeResourceUsage(2*GB, nm_0); // final over-commit, rm.node->1G, test.node->2G + checkNodeResourceUsage(0*GB, nm_1); + + LOG.info("Finishing up task_0_1"); + application_0.finishTask(task_0_1); + application_0.schedule(); + application_1.schedule(); + nm_0.heartbeat(); + nm_1.heartbeat(); + checkApplicationResourceUsage(1 * GB, application_0); + checkApplicationResourceUsage(0 * GB, application_1); + + LOG.info("Finishing up task_0_2"); + application_0.finishTask(task_0_2); // now task_1_3 can go! + application_0.schedule(); + application_1.schedule(); + nm_0.heartbeat(); + nm_1.heartbeat(); + checkApplicationResourceUsage(0 * GB, application_0); + checkApplicationResourceUsage(4 * GB, application_1); + + LOG.info("Finishing up task_1_3"); + application_1.finishTask(task_1_3); // now task_1_1 + application_0.schedule(); + application_1.schedule(); + nm_0.heartbeat(); + nm_1.heartbeat(); + checkApplicationResourceUsage(0 * GB, application_0); + checkApplicationResourceUsage(3 * GB, application_1); + + LOG.info("Finishing up task_1_1"); + application_1.finishTask(task_1_1); + application_0.schedule(); + application_1.schedule(); + nm_0.heartbeat(); + nm_1.heartbeat(); + checkApplicationResourceUsage(0 * GB, application_0); + checkApplicationResourceUsage(3 * GB, application_1); + + LOG.info("--- END: testFifoScheduler ---"); + } + + @SuppressWarnings("resource") + @Test + public void testBlackListNodes() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler(); + + String host = "127.0.0.1"; + RMNode node = + MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); + fs.handle(new NodeAddedSchedulerEvent(node)); + + ApplicationId appId = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + SchedulerEvent appEvent = + new AppAddedSchedulerEvent(appId, "default", + "user"); + fs.handle(appEvent); + SchedulerEvent attemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + fs.handle(attemptEvent); + + // Verify the blacklist can be updated independent of requesting containers + fs.allocate(appAttemptId, Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(host), null); + Assert.assertTrue(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); + fs.allocate(appAttemptId, Collections.emptyList(), + Collections.emptyList(), null, + Collections.singletonList(host)); + Assert.assertFalse(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); + rm.stop(); + } + + @Test + public void testGetAppsInQueue() throws Exception { + Application application_0 = new Application("user_0", resourceManager); + application_0.submit(); + + Application application_1 = new Application("user_0", resourceManager); + application_1.submit(); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + List appsInDefault = scheduler.getAppsInQueue("default"); + assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId())); + assertEquals(2, appsInDefault.size()); + + Assert.assertNull(scheduler.getAppsInQueue("someotherqueue")); + } + + @Test + public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); + TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(fs.applications, + fs, "queue"); + } + + private void checkApplicationResourceUsage(int expected, + Application application) { + Assert.assertEquals(expected, application.getUsedResources().getMemory()); + } + + private void checkNodeResourceUsage(int expected, + org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) { + Assert.assertEquals(expected, node.getUsed().getMemory()); + node.checkResourceUsage(); + } + + public static void main(String[] arg) throws Exception { + TestFifoScheduler t = new TestFifoScheduler(); + t.setUp(); + t.testFifoScheduler(); + t.tearDown(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java index ab6bb35fdc3..349bae4c384 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java @@ -41,9 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -1387,31 +1384,30 @@ public class TestRMWebServicesApps extends JerseyTest { rm.stop(); } - @Test + @Test (timeout = 20000) public void testMultipleAppAttempts() throws JSONException, Exception { rm.start(); MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 8192); RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1"); - amNodeManager.nodeHeartbeat(true); - rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), - RMAppAttemptState.ALLOCATED); + MockAM am = MockRM.launchAM(app1, rm, amNodeManager); int maxAppAttempts = rm.getConfig().getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); assertTrue(maxAppAttempts > 1); - int retriesLeft = maxAppAttempts; - while (--retriesLeft > 0) { - RMAppEvent event = - new RMAppFailedAttemptEvent(app1.getApplicationId(), - RMAppEventType.ATTEMPT_FAILED, "", false); - app1.handle(event); + int numAttempt = 1; + while (true) { + // fail the AM by sending CONTAINER_FINISHED event without registering. + amNodeManager.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FAILED); + if (numAttempt == maxAppAttempts) { + rm.waitForState(app1.getApplicationId(), RMAppState.FAILED); + break; + } + // wait for app to start a new attempt. rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - amNodeManager.nodeHeartbeat(true); + am = MockRM.launchAM(app1, rm, amNodeManager); + numAttempt++; } - // kick the scheduler to allocate the am container. - amNodeManager.nodeHeartbeat(true); - rm.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(), - RMAppAttemptState.ALLOCATED); assertEquals("incorrect number of attempts", maxAppAttempts, app1.getAppAttempts().values().size()); testAppAttemptsHelper(app1.getApplicationId().toString(), app1,