diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6900a35097f..26245f58f9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -344,17 +344,47 @@ public class YarnConfiguration extends Configuration { YARN_PREFIX + "distributed-scheduling.top-k"; public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10; - /** Frequency for computing Top K Best Nodes */ - public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS = - YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms"; - public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000; + /** Frequency for computing least loaded NMs. */ + public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = + YARN_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms"; + public static final long + NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000; - /** Comparator for determining Node Load for Distributed Scheduling */ - public static final String DIST_SCHEDULING_TOP_K_COMPARATOR = - YARN_PREFIX + "distributed-scheduling.top-k-comparator"; - public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT = + /** Comparator for determining Node Load for Distributed Scheduling. */ + public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR = + YARN_PREFIX + "nm-container-queuing.load-comparator"; + public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT = "QUEUE_LENGTH"; + /** Value of standard deviation used for calculation of queue limit + * thresholds. */ + public static final String NM_CONTAINER_QUEUING_LIMIT_STDEV = + YARN_PREFIX + "nm-container-queuing.queue-limit-stdev"; + public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT = + 1.0f; + + /** Min length of container queue at NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH = + YARN_PREFIX + "nm-container-queuing.min-queue-length"; + public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1; + + /** Max length of container queue at NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = + YARN_PREFIX + "nm-container-queuing.max-queue-length"; + public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10; + + /** Min wait time of container queue at NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS = + YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms"; + public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT = + 1; + + /** Max wait time of container queue at NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS = + YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms"; + public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT = + 10; + /** * Enable/disable intermediate-data encryption at YARN level. For now, this * only is used by the FileSystemRMStateStore to setup right file-system diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index c92a276d09d..61b698df514 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -135,9 +135,19 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { configurationPrefixToSkipCompare .add(YarnConfiguration.DIST_SCHEDULING_TOP_K); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS); + .add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR); + .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV); // Set by container-executor.cfg configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index d447a2e3ccc..386353a1204 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -82,4 +83,7 @@ public interface NodeHeartbeatResponse { List getContainersToDecrease(); void addAllContainersToDecrease(Collection containersToDecrease); + + ContainerQueuingLimit getContainerQueuingLimit(); + void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 81621774866..3422697ccd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; @@ -49,8 +50,10 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatR import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -68,6 +71,7 @@ public class NodeHeartbeatResponsePBImpl extends private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private ContainerQueuingLimit containerQueuingLimit = null; private List containersToDecrease = null; private List containersToSignal = null; @@ -105,6 +109,10 @@ public class NodeHeartbeatResponsePBImpl extends builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containerQueuingLimit != null) { + builder.setContainerQueuingLimit( + convertToProtoFormat(this.containerQueuingLimit)); + } if (this.systemCredentials != null) { addSystemCredentialsToProto(); } @@ -224,6 +232,30 @@ public class NodeHeartbeatResponsePBImpl extends this.nmTokenMasterKey = masterKey; } + @Override + public ContainerQueuingLimit getContainerQueuingLimit() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerQueuingLimit != null) { + return this.containerQueuingLimit; + } + if (!p.hasContainerQueuingLimit()) { + return null; + } + this.containerQueuingLimit = + convertFromProtoFormat(p.getContainerQueuingLimit()); + return this.containerQueuingLimit; + } + + @Override + public void setContainerQueuingLimit(ContainerQueuingLimit + containerQueuingLimit) { + maybeInitBuilder(); + if (containerQueuingLimit == null) { + builder.clearContainerQueuingLimit(); + } + this.containerQueuingLimit = containerQueuingLimit; + } + @Override public NodeAction getNodeAction() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; @@ -674,6 +706,16 @@ public class NodeHeartbeatResponsePBImpl extends builder.addAllContainersToSignal(iterable); } + private ContainerQueuingLimit convertFromProtoFormat( + ContainerQueuingLimitProto p) { + return new ContainerQueuingLimitPBImpl(p); + } + + private ContainerQueuingLimitProto convertToProtoFormat( + ContainerQueuingLimit c) { + return ((ContainerQueuingLimitPBImpl)c).getProto(); + } + private SignalContainerRequestPBImpl convertFromProtoFormat( SignalContainerRequestProto p) { return new SignalContainerRequestPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java new file mode 100644 index 00000000000..a8ae92766a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.yarn.util.Records; + +/** + * Used to hold max wait time / queue length information to be + * passed back to the NodeManager. + */ +public abstract class ContainerQueuingLimit { + + public static ContainerQueuingLimit newInstance() { + ContainerQueuingLimit containerQueuingLimit = + Records.newRecord(ContainerQueuingLimit.class); + containerQueuingLimit.setMaxQueueLength(-1); + containerQueuingLimit.setMaxQueueWaitTimeInMs(-1); + return containerQueuingLimit; + } + + public abstract int getMaxQueueLength(); + + public abstract void setMaxQueueLength(int queueLength); + + public abstract int getMaxQueueWaitTimeInMs(); + + public abstract void setMaxQueueWaitTimeInMs(int waitTime); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java new file mode 100644 index 00000000000..d071a0852ce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; + +/** + * Implementation of ContainerQueuingLimit interface. + */ +public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit { + + private ContainerQueuingLimitProto proto = + ContainerQueuingLimitProto.getDefaultInstance(); + private ContainerQueuingLimitProto.Builder builder = null; + private boolean viaProto = false; + + public ContainerQueuingLimitPBImpl() { + builder = ContainerQueuingLimitProto.newBuilder(); + } + + public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) { + this.proto = proto; + this.viaProto = true; + } + + public ContainerQueuingLimitProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerQueuingLimitProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMaxQueueWaitTimeInMs() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueWaitTimeInMs(); + } + + @Override + public void setMaxQueueWaitTimeInMs(int waitTime) { + maybeInitBuilder(); + builder.setMaxQueueWaitTimeInMs(waitTime); + } + + @Override + public int getMaxQueueLength() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueLength(); + } + + @Override + public void setMaxQueueLength(int queueLength) { + maybeInitBuilder(); + builder.setMaxQueueLength(queueLength); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 99c3784471f..a7e5a8622a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -102,6 +102,12 @@ message NodeHeartbeatResponseProto { repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; + optional ContainerQueuingLimitProto container_queuing_limit = 15; +} + +message ContainerQueuingLimitProto { + optional int32 max_queue_length = 1; + optional int32 max_queue_wait_time_in_ms = 2; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 626d0a15a85..cfcf1bdbf04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -23,13 +23,13 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -82,7 +82,7 @@ public interface Context { NodeHealthStatus getNodeHealthStatus(); - ContainerManagementProtocol getContainerManager(); + ContainerManager getContainerManager(); NodeResourceMonitor getNodeResourceMonitor(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index b48706d6151..6ca7ffe6a2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -47,7 +47,6 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -465,7 +465,7 @@ public class NodeManager extends CompositeService private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; - private ContainerManagementProtocol containerManager; + private ContainerManager containerManager; private NodeResourceMonitor nodeResourceMonitor; private final LocalDirsHandlerService dirsHandler; private final ApplicationACLsManager aclsManager; @@ -555,11 +555,11 @@ public class NodeManager extends CompositeService } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return this.containerManager; } - public void setContainerManager(ContainerManagementProtocol containerManager) { + public void setContainerManager(ContainerManager containerManager) { this.containerManager = containerManager; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index deb361d8eb1..a72f0d1a084 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,13 +71,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; + +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; @@ -411,8 +412,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info(successfullRegistrationMsg); LOG.info("Notifying ContainerManager to unblock new container-requests"); - ((ContainerManagerImpl) this.context.getContainerManager()) - .setBlockNewContainerRequests(false); + this.context.getContainerManager().setBlockNewContainerRequests(false); } private List createKeepAliveApplicationList() { @@ -475,10 +475,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements * @return Resource utilization of all the containers. */ private ResourceUtilization getContainersUtilization() { - ContainerManagerImpl containerManager = - (ContainerManagerImpl) this.context.getContainerManager(); ContainersMonitor containersMonitor = - containerManager.getContainersMonitor(); + this.context.getContainerManager().getContainersMonitor(); return containersMonitor.getContainersUtilization(); } @@ -751,7 +749,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements Set nodeLabelsForHeartbeat = nodeLabelsHandler.getNodeLabelsForHeartbeat(); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); - NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, NodeStatusUpdaterImpl.this.context @@ -776,82 +773,70 @@ public class NodeStatusUpdaterImpl extends AbstractService implements nextHeartBeatInterval = response.getNextHeartBeatInterval(); updateMasterKeys(response); - if (response.getNodeAction() == NodeAction.SHUTDOWN) { - LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of" - + " heartbeat, hence shutting down."); - LOG.warn("Message from ResourceManager: " - + response.getDiagnosticsMessage()); - context.setDecommissioned(true); - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); - break; - } - if (response.getNodeAction() == NodeAction.RESYNC) { - LOG.warn("Node is out of sync with ResourceManager," - + " hence resyncing."); - LOG.warn("Message from ResourceManager: " - + response.getDiagnosticsMessage()); - // Invalidate the RMIdentifier while resync - NodeStatusUpdaterImpl.this.rmIdentifier = - ResourceManagerConstants.RM_INVALID_IDENTIFIER; - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.RESYNC)); - pendingCompletedContainers.clear(); - break; - } + if (!handleShutdownOrResyncCommand(response)) { + nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels( + response); - nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response); - - // Explicitly put this method after checking the resync response. We - // don't want to remove the completed containers before resync - // because these completed containers will be reported back to RM - // when NM re-registers with RM. - // Only remove the cleanedup containers that are acked - removeOrTrackCompletedContainersFromContext(response + // Explicitly put this method after checking the resync + // response. We + // don't want to remove the completed containers before resync + // because these completed containers will be reported back to RM + // when NM re-registers with RM. + // Only remove the cleanedup containers that are acked + removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); - logAggregationReportForAppsTempList.clear(); - lastHeartbeatID = response.getResponseId(); - List containersToCleanup = response - .getContainersToCleanup(); - if (!containersToCleanup.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrCompletedContainersEvent(containersToCleanup, - CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); - } - List appsToCleanup = - response.getApplicationsToCleanup(); - //Only start tracking for keepAlive on FINISH_APP - trackAppsForKeepAlive(appsToCleanup); - if (!appsToCleanup.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrCompletedAppsEvent(appsToCleanup, - CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); - } + logAggregationReportForAppsTempList.clear(); + lastHeartbeatID = response.getResponseId(); + List containersToCleanup = response + .getContainersToCleanup(); + if (!containersToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedContainersEvent(containersToCleanup, + CMgrCompletedContainersEvent.Reason + .BY_RESOURCEMANAGER)); + } + List appsToCleanup = + response.getApplicationsToCleanup(); + //Only start tracking for keepAlive on FINISH_APP + trackAppsForKeepAlive(appsToCleanup); + if (!appsToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedAppsEvent(appsToCleanup, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + } + Map systemCredentials = + response.getSystemCredentialsForApps(); + if (systemCredentials != null && !systemCredentials.isEmpty()) { + ((NMContext) context).setSystemCrendentialsForApps( + parseCredentials(systemCredentials)); + } + List + containersToDecrease = response.getContainersToDecrease(); + if (!containersToDecrease.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrDecreaseContainersResourceEvent( + containersToDecrease) + ); + } - Map systemCredentials = - response.getSystemCredentialsForApps(); - if (systemCredentials != null && !systemCredentials.isEmpty()) { - ((NMContext) context) - .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); - } + // SignalContainer request originally comes from end users via + // ClientRMProtocol's SignalContainer. Forward the request to + // ContainerManager which will dispatch the event to + // ContainerLauncher. + List containersToSignal = response + .getContainersToSignalList(); + if (containersToSignal.size() != 0) { + dispatcher.getEventHandler().handle( + new CMgrSignalContainersEvent(containersToSignal)); + } - List - containersToDecrease = response.getContainersToDecrease(); - if (!containersToDecrease.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrDecreaseContainersResourceEvent(containersToDecrease) - ); - } - - // SignalContainer request originally comes from end users via - // ClientRMProtocol's SignalContainer. Forward the request to - // ContainerManager which will dispatch the event to ContainerLauncher. - List containersToSignal = response - .getContainersToSignalList(); - if (containersToSignal.size() != 0) { - dispatcher.getEventHandler().handle( - new CMgrSignalContainersEvent(containersToSignal)); + // Update QueuingLimits if ContainerManager supports queuing + ContainerQueuingLimit queuingLimit = + response.getContainerQueuingLimit(); + if (queuingLimit != null) { + context.getContainerManager().updateQueuingLimit(queuingLimit); + } } // Handling node resource update case. Resource newResource = response.getResource(); @@ -908,6 +893,34 @@ public class NodeStatusUpdaterImpl extends AbstractService implements statusUpdater.start(); } + private boolean handleShutdownOrResyncCommand( + NodeHeartbeatResponse response) { + if (response.getNodeAction() == NodeAction.SHUTDOWN) { + LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of" + + " heartbeat, hence shutting down."); + LOG.warn("Message from ResourceManager: " + + response.getDiagnosticsMessage()); + context.setDecommissioned(true); + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + return true; + } + if (response.getNodeAction() == NodeAction.RESYNC) { + LOG.warn("Node is out of sync with ResourceManager," + + " hence resyncing."); + LOG.warn("Message from ResourceManager: " + + response.getDiagnosticsMessage()); + // Invalidate the RMIdentifier while resync + NodeStatusUpdaterImpl.this.rmIdentifier = + ResourceManagerConstants.RM_INVALID_IDENTIFIER; + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.RESYNC)); + pendingCompletedContainers.clear(); + return true; + } + return false; + } + private List getLogAggregationReportsForApps( ConcurrentLinkedQueue lastestLogAggregationStatus) { LogAggregationReport status; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java new file mode 100644 index 00000000000..0da02b3bec1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor + .ContainersMonitor; + +/** + * The ContainerManager is an entity that manages the life cycle of Containers. + */ +public interface ContainerManager extends ServiceStateChangeListener, + ContainerManagementProtocol, + EventHandler { + + ContainersMonitor getContainersMonitor(); + + void updateQueuingLimit(ContainerQueuingLimit queuingLimit); + + void setBlockNewContainerRequests(boolean blockNewContainerRequests); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 4383d2bfc0d..e1c4131e58f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -53,7 +53,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; -import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -95,6 +94,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Containe import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; @@ -150,8 +150,7 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerManagerImpl extends CompositeService implements - ServiceStateChangeListener, ContainerManagementProtocol, - EventHandler { + ContainerManager { /** * Extra duration to wait for applications to be killed on shutdown. @@ -410,6 +409,7 @@ public class ContainerManagerImpl extends CompositeService implements } } + @Override public ContainersMonitor getContainersMonitor() { return this.containersMonitor; } @@ -1398,6 +1398,7 @@ public class ContainerManagerImpl extends CompositeService implements } } + @Override public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { this.blockNewContainerRequests.set(blockNewContainerRequests); } @@ -1434,4 +1435,9 @@ public class ContainerManagerImpl extends CompositeService implements protected boolean isServiceStopped() { return serviceStopped; } + + @Override + public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) { + LOG.trace("Implementation does not support queuing of Containers !!"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 13d6efb3f39..5b1b77a721b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -83,6 +84,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { private Queue queuedOpportunisticContainers; private Set opportunisticContainersToKill; + private final ContainerQueuingLimit queuingLimit; public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -95,6 +97,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>(); this.opportunisticContainersToKill = Collections.synchronizedSet( new HashSet()); + this.queuingLimit = ContainerQueuingLimit.newInstance(); } @Override @@ -468,7 +471,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { + "will be added to the queued containers."); AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo( - token, null, rcs.getStartRequest(), token.getExecutionType(), + token, rcs.getStartRequest(), token.getExecutionType(), token.getResource(), getConfig()); this.context.getQueuingContext().getQueuedContainers().put( @@ -526,6 +529,41 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { } } + @Override + public void updateQueuingLimit(ContainerQueuingLimit limit) { + this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength()); + // TODO: Include wait time as well once it is implemented + if (this.queuingLimit.getMaxQueueLength() > -1) { + shedQueuedOpportunisticContainers(); + } + } + + private void shedQueuedOpportunisticContainers() { + int numAllowed = this.queuingLimit.getMaxQueueLength(); + Iterator containerIter = + queuedOpportunisticContainers.iterator(); + while (containerIter.hasNext()) { + AllocatedContainerInfo cInfo = containerIter.next(); + if (numAllowed <= 0) { + containerIter.remove(); + ContainerTokenIdentifier containerTokenIdentifier = this.context + .getQueuingContext().getQueuedContainers().remove( + cInfo.getContainerTokenIdentifier().getContainerID()); + // The Container might have already started while we were + // iterating.. + if (containerTokenIdentifier != null) { + this.context.getQueuingContext().getKilledQueuedContainers() + .putIfAbsent(cInfo.getContainerTokenIdentifier(), + "Container De-queued to meet global queuing limits. " + + "Max Queue length[" + + this.queuingLimit.getMaxQueueLength() + "]"); + } + } + numAllowed--; + } + } + + static class AllocatedContainerInfo { private final ContainerTokenIdentifier containerTokenIdentifier; private final StartContainerRequest startRequest; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index ce405f83706..6ff74c9f53e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -642,7 +643,7 @@ public abstract class BaseAMRMProxyTest { } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java index 4fd62d0b2da..5fb05ca0f6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -24,13 +24,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import java.util.List; +/** + * Implementations of this class are notified of changes to the cluster's state, + * such as node addition, removal and updates. + */ public interface ClusterMonitor { void addNode(List containerStatuses, RMNode rmNode); void removeNode(RMNode removedRMNode); - void nodeUpdate(RMNode rmNode); + void updateNode(RMNode rmNode); void updateNodeResource(RMNode rmNode, ResourceOption resourceOption); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java index 170d91ab6e7..a93f6835159 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -46,8 +46,10 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed - .TopKNodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -57,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -76,30 +77,64 @@ public class DistributedSchedulingService extends ApplicationMasterService private static final Log LOG = LogFactory.getLog(DistributedSchedulingService.class); - private final TopKNodeSelector clusterMonitor; + private final NodeQueueLoadMonitor nodeMonitor; private final ConcurrentHashMap> rackToNode = new ConcurrentHashMap<>(); private final ConcurrentHashMap> hostToNode = new ConcurrentHashMap<>(); + private final int k; public DistributedSchedulingService(RMContext rmContext, YarnScheduler scheduler) { super(DistributedSchedulingService.class.getName(), rmContext, scheduler); - int k = rmContext.getYarnConfiguration().getInt( + this.k = rmContext.getYarnConfiguration().getInt( YarnConfiguration.DIST_SCHEDULING_TOP_K, YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT); - long topKComputationInterval = rmContext.getYarnConfiguration().getLong( - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS, - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT); - TopKNodeSelector.TopKComparator comparator = - TopKNodeSelector.TopKComparator.valueOf( + long nodeSortInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT); + NodeQueueLoadMonitor.LoadComparator comparator = + NodeQueueLoadMonitor.LoadComparator.valueOf( rmContext.getYarnConfiguration().get( - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR, - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT)); - TopKNodeSelector topKSelector = - new TopKNodeSelector(k, topKComputationInterval, comparator); - this.clusterMonitor = topKSelector; + YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, + YarnConfiguration. + NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); + + NodeQueueLoadMonitor topKSelector = + new NodeQueueLoadMonitor(nodeSortInterval, comparator); + + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, + YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT); + + int limitMin, limitMax; + + if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) { + limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); + } else { + limitMin = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT); + } + + topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); + this.nodeMonitor = topKSelector; } @Override @@ -189,7 +224,7 @@ public class DistributedSchedulingService extends ApplicationMasterService // Set nodes to be used for scheduling dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + this.nodeMonitor.selectLeastLoadedNodes(this.k)); return dsResp; } @@ -201,7 +236,7 @@ public class DistributedSchedulingService extends ApplicationMasterService (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + this.nodeMonitor.selectLeastLoadedNodes(this.k)); return dsResp; } @@ -229,67 +264,72 @@ public class DistributedSchedulingService extends ApplicationMasterService @Override public void handle(SchedulerEvent event) { switch (event.getType()) { - case NODE_ADDED: - if (!(event instanceof NodeAddedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; - clusterMonitor.addNode(nodeAddedEvent.getContainerReports(), - nodeAddedEvent.getAddedRMNode()); - addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - break; - case NODE_REMOVED: - if (!(event instanceof NodeRemovedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeRemovedSchedulerEvent nodeRemovedEvent = - (NodeRemovedSchedulerEvent)event; - clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); - removeFromMapping(rackToNode, - nodeRemovedEvent.getRemovedRMNode().getRackName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - removeFromMapping(hostToNode, - nodeRemovedEvent.getRemovedRMNode().getHostName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - break; - case NODE_UPDATE: - if (!(event instanceof NodeUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode()); - break; - case NODE_RESOURCE_UPDATE: - if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = - (NodeResourceUpdateSchedulerEvent)event; - clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), - nodeResourceUpdatedEvent.getResourceOption()); - break; + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; + nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = + (NodeRemovedSchedulerEvent) event; + nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeFromMapping(rackToNode, + nodeRemovedEvent.getRemovedRMNode().getRackName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + removeFromMapping(hostToNode, + nodeRemovedEvent.getRemovedRMNode().getHostName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) + event; + nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode()); + break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent) event; + nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; - // <-- IGNORED EVENTS : START --> - case APP_ADDED: - break; - case APP_REMOVED: - break; - case APP_ATTEMPT_ADDED: - break; - case APP_ATTEMPT_REMOVED: - break; - case CONTAINER_EXPIRED: - break; - case NODE_LABELS_UPDATE: - break; - // <-- IGNORED EVENTS : END --> - default: - LOG.error("Unknown event arrived at DistributedSchedulingService: " - + event.toString()); + // <-- IGNORED EVENTS : START --> + case APP_ADDED: + break; + case APP_REMOVED: + break; + case APP_ATTEMPT_ADDED: + break; + case APP_ATTEMPT_REMOVED: + break; + case CONTAINER_EXPIRED: + break; + case NODE_LABELS_UPDATE: + break; + // <-- IGNORED EVENTS : END --> + default: + LOG.error("Unknown event arrived at DistributedSchedulingService: " + + event.toString()); } + } + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return nodeMonitor.getThresholdCalculator(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f50da3bfd79..b063ecb779f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -139,4 +141,6 @@ public interface RMContext { void setLeaderElectorService(LeaderElectorService elector); LeaderElectorService getLeaderElectorService(); + + QueueLimitCalculator getNodeManagerQueueLimitCalculator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ec2aeb7f0e7..d22838851c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -74,6 +76,8 @@ public class RMContextImpl implements RMContext { private SystemMetricsPublisher systemMetricsPublisher; private LeaderElectorService elector; + private QueueLimitCalculator queueLimitCalculator; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -472,4 +476,14 @@ public class RMContextImpl implements RMContext { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + public void setContainerQueueLimitCalculator( + QueueLimitCalculator limitCalculator) { + this.queueLimitCalculator = limitCalculator; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6c80a589079..f9d332511f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1154,6 +1154,8 @@ public class ResourceManager extends CompositeService implements Recoverable { addService(distSchedulerEventDispatcher); rmDispatcher.register(SchedulerEventType.class, distSchedulerEventDispatcher); + this.rmContext.setContainerQueueLimitCalculator( + distributedSchedulingService.getNodeManagerQueueLimitCalculator()); return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index ed4d5c06447..cfb050a1111 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -563,6 +563,13 @@ public class ResourceTrackerService extends AbstractService implements nodeHeartBeatResponse.setResource(capability); } + // 7. Send Container Queuing Limits back to the Node. This will be used by + // the node to truncate the number of Containers queued for execution. + if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) { + nodeHeartBeatResponse.setContainerQueuingLimit( + this.rmContext.getNodeManagerQueueLimitCalculator() + .createContainerQueuingLimit()); + } return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java similarity index 61% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index 7e24687f74b..21f4f6e16ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -31,36 +31,47 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -public class TopKNodeSelector implements ClusterMonitor { +/** + * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length + * and total wait time) associated with Container Queues on the Node Manager. + * It uses this information to periodically sort the Nodes from least to most + * loaded. + */ +public class NodeQueueLoadMonitor implements ClusterMonitor { - final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); + final static Log LOG = LogFactory.getLog(NodeQueueLoadMonitor.class); - public enum TopKComparator implements Comparator { - WAIT_TIME, - QUEUE_LENGTH; + /** + * The comparator used to specify the metric against which the load + * of two Nodes are compared. + */ + public enum LoadComparator implements Comparator { + QUEUE_LENGTH, + QUEUE_WAIT_TIME; @Override public int compare(ClusterNode o1, ClusterNode o2) { - if (getQuant(o1) == getQuant(o2)) { + if (getMetric(o1) == getMetric(o2)) { return o1.timestamp < o2.timestamp ? +1 : -1; } - return getQuant(o1) > getQuant(o2) ? +1 : -1; + return getMetric(o1) > getMetric(o2) ? +1 : -1; } - private int getQuant(ClusterNode c) { - return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; + public int getMetric(ClusterNode c) { + return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime; } } static class ClusterNode { - int queueTime = -1; - int waitQueueLength = 0; + int queueLength = 0; + int queueWaitTime = -1; double timestamp; final NodeId nodeId; @@ -69,13 +80,13 @@ public class TopKNodeSelector implements ClusterMonitor { updateTimestamp(); } - public ClusterNode setQueueTime(int queueTime) { - this.queueTime = queueTime; + public ClusterNode setQueueLength(int qLength) { + this.queueLength = qLength; return this; } - public ClusterNode setWaitQueueLength(int queueLength) { - this.waitQueueLength = queueLength; + public ClusterNode setQueueWaitTime(int wTime) { + this.queueWaitTime = wTime; return this; } @@ -85,34 +96,37 @@ public class TopKNodeSelector implements ClusterMonitor { } } - private final int k; - private final List topKNodes; private final ScheduledExecutorService scheduledExecutor; - private final HashMap clusterNodes = new HashMap<>(); - private final Comparator comparator; + + private final List sortedNodes; + private final Map clusterNodes = + new ConcurrentHashMap<>(); + private final LoadComparator comparator; + private QueueLimitCalculator thresholdCalculator; Runnable computeTask = new Runnable() { @Override public void run() { - synchronized (topKNodes) { - topKNodes.clear(); - topKNodes.addAll(computeTopKNodes()); + synchronized (sortedNodes) { + sortedNodes.clear(); + sortedNodes.addAll(sortNodes()); + if (thresholdCalculator != null) { + thresholdCalculator.update(); + } } } }; @VisibleForTesting - TopKNodeSelector(int k, TopKComparator comparator) { - this.k = k; - this.topKNodes = new ArrayList<>(); + NodeQueueLoadMonitor(LoadComparator comparator) { + this.sortedNodes = new ArrayList<>(); this.comparator = comparator; this.scheduledExecutor = null; } - public TopKNodeSelector(int k, long nodeComputationInterval, - TopKComparator comparator) { - this.k = k; - this.topKNodes = new ArrayList<>(); + public NodeQueueLoadMonitor(long nodeComputationInterval, + LoadComparator comparator) { + this.sortedNodes = new ArrayList<>(); this.scheduledExecutor = Executors.newScheduledThreadPool(1); this.comparator = comparator; this.scheduledExecutor.scheduleAtFixedRate(computeTask, @@ -120,12 +134,32 @@ public class TopKNodeSelector implements ClusterMonitor { TimeUnit.MILLISECONDS); } + List getSortedNodes() { + return sortedNodes; + } + + public QueueLimitCalculator getThresholdCalculator() { + return thresholdCalculator; + } + + Map getClusterNodes() { + return clusterNodes; + } + + Comparator getComparator() { + return comparator; + } + + public void initThresholdCalculator(float sigma, int limitMin, int limitMax) { + this.thresholdCalculator = + new QueueLimitCalculator(this, sigma, limitMin, limitMax); + } @Override public void addNode(List containerStatuses, RMNode rmNode) { LOG.debug("Node added event from: " + rmNode.getNode().getName()); - // Ignoring this currently : atleast one NODE_UPDATE heartbeat is + // Ignoring this currently : at least one NODE_UPDATE heartbeat is // required to ensure node eligibility. } @@ -143,24 +177,24 @@ public class TopKNodeSelector implements ClusterMonitor { } @Override - public void nodeUpdate(RMNode rmNode) { + public void updateNode(RMNode rmNode) { LOG.debug("Node update event from: " + rmNode.getNodeID()); QueuedContainersStatus queuedContainersStatus = rmNode.getQueuedContainersStatus(); int estimatedQueueWaitTime = queuedContainersStatus.getEstimatedQueueWaitTime(); int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); - // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node - // UNLESS comparator is based on queue length, in which case, we should add + // Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node + // UNLESS comparator is based on queue length. synchronized (this.clusterNodes) { ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); if (currentNode == null) { if (estimatedQueueWaitTime != -1 - || comparator == TopKComparator.QUEUE_LENGTH) { + || comparator == LoadComparator.QUEUE_LENGTH) { this.clusterNodes.put(rmNode.getNodeID(), new ClusterNode(rmNode.getNodeID()) - .setQueueTime(estimatedQueueWaitTime) - .setWaitQueueLength(waitQueueLength)); + .setQueueWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength)); LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); @@ -171,10 +205,10 @@ public class TopKNodeSelector implements ClusterMonitor { } } else { if (estimatedQueueWaitTime != -1 - || comparator == TopKComparator.QUEUE_LENGTH) { + || comparator == LoadComparator.QUEUE_LENGTH) { currentNode - .setQueueTime(estimatedQueueWaitTime) - .setWaitQueueLength(waitQueueLength) + .setQueueWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength) .updateTimestamp(); LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + "with queue wait time [" + estimatedQueueWaitTime + "] and " + @@ -182,8 +216,8 @@ public class TopKNodeSelector implements ClusterMonitor { } else { this.clusterNodes.remove(rmNode.getNodeID()); LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + currentNode.queueTime + "] and " + - "wait queue length [" + currentNode.waitQueueLength + "]"); + "with queue wait time [" + currentNode.queueWaitTime + "] and " + + "wait queue length [" + currentNode.queueLength + "]"); } } } @@ -192,25 +226,38 @@ public class TopKNodeSelector implements ClusterMonitor { @Override public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { LOG.debug("Node resource update event from: " + rmNode.getNodeID()); - // Ignoring this currently... + // Ignoring this currently. } + /** + * Returns all Node Ids as ordered list from Least to Most Loaded. + * @return ordered list of nodes + */ public List selectNodes() { - synchronized (this.topKNodes) { - return this.k < this.topKNodes.size() ? - new ArrayList<>(this.topKNodes).subList(0, this.k) : - new ArrayList<>(this.topKNodes); + return selectLeastLoadedNodes(-1); + } + + /** + * Returns 'K' of the least Loaded Node Ids as ordered list. + * @param k max number of nodes to return + * @return ordered list of nodes + */ + public List selectLeastLoadedNodes(int k) { + synchronized (this.sortedNodes) { + return ((k < this.sortedNodes.size()) && (k >= 0)) ? + new ArrayList<>(this.sortedNodes).subList(0, k) : + new ArrayList<>(this.sortedNodes); } } - private List computeTopKNodes() { + private List sortNodes() { synchronized (this.clusterNodes) { ArrayList aList = new ArrayList<>(this.clusterNodes.values()); List retList = new ArrayList<>(); Object[] nodes = aList.toArray(); // Collections.sort would do something similar by calling Arrays.sort // internally but would finally iterate through the input list (aList) - // to reset the value of each element.. Since we don't really care about + // to reset the value of each element. Since we don't really care about // 'aList', we can use the iteration to create the list of nodeIds which // is what we ultimately care about. Arrays.sort(nodes, (Comparator)comparator); @@ -220,4 +267,5 @@ public class TopKNodeSelector implements ClusterMonitor { return retList; } } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java new file mode 100644 index 00000000000..ab3a577d9f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.ClusterNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.LoadComparator; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class interacts with the NodeQueueLoadMonitor to keep track of the + * mean and standard deviation of the configured metrics (queue length or queue + * wait time) used to characterize the queue load of a specific node. + * The NodeQueueLoadMonitor triggers an update (by calling the + * update() method) every time it performs a re-ordering of + * all nodes. + */ +public class QueueLimitCalculator { + + class Stats { + private final AtomicInteger mean = new AtomicInteger(0); + private final AtomicInteger stdev = new AtomicInteger(0); + + /** + * Not thread safe. Caller should synchronize on sorted nodes list. + */ + void update() { + List sortedNodes = nodeSelector.getSortedNodes(); + if (sortedNodes.size() > 0) { + // Calculate mean + int sum = 0; + for (NodeId n : sortedNodes) { + sum += getMetric(getNode(n)); + } + mean.set(sum / sortedNodes.size()); + + // Calculate stdev + int sqrSumMean = 0; + for (NodeId n : sortedNodes) { + int val = getMetric(getNode(n)); + sqrSumMean += Math.pow(val - mean.get(), 2); + } + stdev.set( + (int) Math.round(Math.sqrt( + sqrSumMean / (float) sortedNodes.size()))); + } + } + + private ClusterNode getNode(NodeId nId) { + return nodeSelector.getClusterNodes().get(nId); + } + + private int getMetric(ClusterNode cn) { + return (cn != null) ? ((LoadComparator)nodeSelector.getComparator()) + .getMetric(cn) : 0; + } + + public int getMean() { + return mean.get(); + } + + public int getStdev() { + return stdev.get(); + } + } + + private final NodeQueueLoadMonitor nodeSelector; + private final float sigma; + private final int rangeMin; + private final int rangeMax; + private final Stats stats = new Stats(); + + QueueLimitCalculator(NodeQueueLoadMonitor selector, float sigma, + int rangeMin, int rangeMax) { + this.nodeSelector = selector; + this.sigma = sigma; + this.rangeMax = rangeMax; + this.rangeMin = rangeMin; + } + + private int determineThreshold() { + return (int) (stats.getMean() + sigma * stats.getStdev()); + } + + void update() { + this.stats.update(); + } + + private int getThreshold() { + int thres = determineThreshold(); + return Math.min(rangeMax, Math.max(rangeMin, thres)); + } + + public ContainerQueuingLimit createContainerQueuingLimit() { + ContainerQueuingLimit containerQueuingLimit = + ContainerQueuingLimit.newInstance(); + if (nodeSelector.getComparator() == LoadComparator.QUEUE_WAIT_TIME) { + containerQueuingLimit.setMaxQueueWaitTimeInMs(getThreshold()); + containerQueuingLimit.setMaxQueueLength(-1); + } else { + containerQueuingLimit.setMaxQueueWaitTimeInMs(-1); + containerQueuingLimit.setMaxQueueLength(getThreshold()); + } + return containerQueuingLimit; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java similarity index 58% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java index aec4e86763b..5f63923051d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.junit.Assert; @@ -27,7 +28,10 @@ import org.mockito.Mockito; import java.util.List; -public class TestTopKNodeSelector { +/** + * Unit tests for NodeQueueLoadMonitor. + */ +public class TestNodeQueueLoadMonitor { static class FakeNodeId extends NodeId { final String host; @@ -62,12 +66,12 @@ public class TestTopKNodeSelector { } @Test - public void testQueueTimeSort() { - TopKNodeSelector selector = new TopKNodeSelector(5, - TopKNodeSelector.TopKComparator.WAIT_TIME); - selector.nodeUpdate(createRMNode("h1", 1, 15, 10)); - selector.nodeUpdate(createRMNode("h2", 2, 5, 10)); - selector.nodeUpdate(createRMNode("h3", 3, 10, 10)); + public void testWaitTimeSort() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_WAIT_TIME); + selector.updateNode(createRMNode("h1", 1, 15, 10)); + selector.updateNode(createRMNode("h2", 2, 5, 10)); + selector.updateNode(createRMNode("h3", 3, 10, 10)); selector.computeTask.run(); List nodeIds = selector.selectNodes(); System.out.println("1-> " + nodeIds); @@ -76,7 +80,7 @@ public class TestTopKNodeSelector { Assert.assertEquals("h1:1", nodeIds.get(2).toString()); // Now update node3 - selector.nodeUpdate(createRMNode("h3", 3, 2, 10)); + selector.updateNode(createRMNode("h3", 3, 2, 10)); selector.computeTask.run(); nodeIds = selector.selectNodes(); System.out.println("2-> "+ nodeIds); @@ -85,7 +89,7 @@ public class TestTopKNodeSelector { Assert.assertEquals("h1:1", nodeIds.get(2).toString()); // Now send update with -1 wait time - selector.nodeUpdate(createRMNode("h4", 4, -1, 10)); + selector.updateNode(createRMNode("h4", 4, -1, 10)); selector.computeTask.run(); nodeIds = selector.selectNodes(); System.out.println("3-> "+ nodeIds); @@ -97,11 +101,11 @@ public class TestTopKNodeSelector { @Test public void testQueueLengthSort() { - TopKNodeSelector selector = new TopKNodeSelector(5, - TopKNodeSelector.TopKComparator.QUEUE_LENGTH); - selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); - selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); - selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + selector.updateNode(createRMNode("h1", 1, -1, 15)); + selector.updateNode(createRMNode("h2", 2, -1, 5)); + selector.updateNode(createRMNode("h3", 3, -1, 10)); selector.computeTask.run(); List nodeIds = selector.selectNodes(); System.out.println("1-> " + nodeIds); @@ -110,7 +114,7 @@ public class TestTopKNodeSelector { Assert.assertEquals("h1:1", nodeIds.get(2).toString()); // Now update node3 - selector.nodeUpdate(createRMNode("h3", 3, -1, 2)); + selector.updateNode(createRMNode("h3", 3, -1, 2)); selector.computeTask.run(); nodeIds = selector.selectNodes(); System.out.println("2-> "+ nodeIds); @@ -119,7 +123,7 @@ public class TestTopKNodeSelector { Assert.assertEquals("h1:1", nodeIds.get(2).toString()); // Now send update with -1 wait time but valid length - selector.nodeUpdate(createRMNode("h4", 4, -1, 20)); + selector.updateNode(createRMNode("h4", 4, -1, 20)); selector.computeTask.run(); nodeIds = selector.selectNodes(); System.out.println("3-> "+ nodeIds); @@ -130,6 +134,50 @@ public class TestTopKNodeSelector { Assert.assertEquals("h4:4", nodeIds.get(3).toString()); } + @Test + public void testContainerQueuingLimit() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + selector.updateNode(createRMNode("h1", 1, -1, 15)); + selector.updateNode(createRMNode("h2", 2, -1, 5)); + selector.updateNode(createRMNode("h3", 3, -1, 10)); + + // Test Mean Calculation + selector.initThresholdCalculator(0, 6, 100); + QueueLimitCalculator calculator = selector.getThresholdCalculator(); + ContainerQueuingLimit containerQueuingLimit = calculator + .createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs()); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs()); + + // Test Limits do not exceed specified max + selector.updateNode(createRMNode("h1", 1, -1, 110)); + selector.updateNode(createRMNode("h2", 2, -1, 120)); + selector.updateNode(createRMNode("h3", 3, -1, 130)); + selector.updateNode(createRMNode("h4", 4, -1, 140)); + selector.updateNode(createRMNode("h5", 5, -1, 150)); + selector.updateNode(createRMNode("h6", 6, -1, 160)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength()); + + // Test Limits do not go below specified min + selector.updateNode(createRMNode("h1", 1, -1, 1)); + selector.updateNode(createRMNode("h2", 2, -1, 2)); + selector.updateNode(createRMNode("h3", 3, -1, 3)); + selector.updateNode(createRMNode("h4", 4, -1, 4)); + selector.updateNode(createRMNode("h5", 5, -1, 5)); + selector.updateNode(createRMNode("h6", 6, -1, 6)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { RMNode node1 = Mockito.mock(RMNode.class);