YARN-2888. Corrective mechanisms for rebalancing NM container queues. (asuresh)
(cherry picked from commit f0ac18d001
)
This commit is contained in:
parent
005dae521c
commit
69a9ce3494
|
@ -344,17 +344,47 @@ public class YarnConfiguration extends Configuration {
|
||||||
YARN_PREFIX + "distributed-scheduling.top-k";
|
YARN_PREFIX + "distributed-scheduling.top-k";
|
||||||
public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
|
public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
|
||||||
|
|
||||||
/** Frequency for computing Top K Best Nodes */
|
/** Frequency for computing least loaded NMs. */
|
||||||
public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
|
public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
|
||||||
YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
|
YARN_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms";
|
||||||
public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
|
public static final long
|
||||||
|
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000;
|
||||||
|
|
||||||
/** Comparator for determining Node Load for Distributed Scheduling */
|
/** Comparator for determining Node Load for Distributed Scheduling. */
|
||||||
public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
|
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
|
||||||
YARN_PREFIX + "distributed-scheduling.top-k-comparator";
|
YARN_PREFIX + "nm-container-queuing.load-comparator";
|
||||||
public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
|
public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT =
|
||||||
"QUEUE_LENGTH";
|
"QUEUE_LENGTH";
|
||||||
|
|
||||||
|
/** Value of standard deviation used for calculation of queue limit
|
||||||
|
* thresholds. */
|
||||||
|
public static final String NM_CONTAINER_QUEUING_LIMIT_STDEV =
|
||||||
|
YARN_PREFIX + "nm-container-queuing.queue-limit-stdev";
|
||||||
|
public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT =
|
||||||
|
1.0f;
|
||||||
|
|
||||||
|
/** Min length of container queue at NodeManager. */
|
||||||
|
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH =
|
||||||
|
YARN_PREFIX + "nm-container-queuing.min-queue-length";
|
||||||
|
public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1;
|
||||||
|
|
||||||
|
/** Max length of container queue at NodeManager. */
|
||||||
|
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH =
|
||||||
|
YARN_PREFIX + "nm-container-queuing.max-queue-length";
|
||||||
|
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
|
||||||
|
|
||||||
|
/** Min wait time of container queue at NodeManager. */
|
||||||
|
public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
|
||||||
|
YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
|
||||||
|
public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT =
|
||||||
|
1;
|
||||||
|
|
||||||
|
/** Max wait time of container queue at NodeManager. */
|
||||||
|
public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
|
||||||
|
YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
|
||||||
|
public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT =
|
||||||
|
10;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enable/disable intermediate-data encryption at YARN level. For now, this
|
* Enable/disable intermediate-data encryption at YARN level. For now, this
|
||||||
* only is used by the FileSystemRMStateStore to setup right file-system
|
* only is used by the FileSystemRMStateStore to setup right file-system
|
||||||
|
|
|
@ -135,9 +135,19 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
||||||
configurationPrefixToSkipCompare
|
configurationPrefixToSkipCompare
|
||||||
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
|
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
|
||||||
configurationPrefixToSkipCompare
|
configurationPrefixToSkipCompare
|
||||||
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS);
|
.add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
|
||||||
configurationPrefixToSkipCompare
|
configurationPrefixToSkipCompare
|
||||||
.add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR);
|
.add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR);
|
||||||
|
configurationPrefixToSkipCompare
|
||||||
|
.add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
|
||||||
|
configurationPrefixToSkipCompare
|
||||||
|
.add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
|
||||||
|
configurationPrefixToSkipCompare
|
||||||
|
.add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
|
||||||
|
configurationPrefixToSkipCompare
|
||||||
|
.add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
|
||||||
|
configurationPrefixToSkipCompare
|
||||||
|
.add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV);
|
||||||
|
|
||||||
// Set by container-executor.cfg
|
// Set by container-executor.cfg
|
||||||
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
|
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
|
||||||
|
@ -82,4 +83,7 @@ public interface NodeHeartbeatResponse {
|
||||||
|
|
||||||
List<Container> getContainersToDecrease();
|
List<Container> getContainersToDecrease();
|
||||||
void addAllContainersToDecrease(Collection<Container> containersToDecrease);
|
void addAllContainersToDecrease(Collection<Container> containersToDecrease);
|
||||||
|
|
||||||
|
ContainerQueuingLimit getContainerQueuingLimit();
|
||||||
|
void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit);
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||||
|
@ -49,8 +50,10 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatR
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,6 +71,7 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||||
|
|
||||||
private MasterKey containerTokenMasterKey = null;
|
private MasterKey containerTokenMasterKey = null;
|
||||||
private MasterKey nmTokenMasterKey = null;
|
private MasterKey nmTokenMasterKey = null;
|
||||||
|
private ContainerQueuingLimit containerQueuingLimit = null;
|
||||||
private List<Container> containersToDecrease = null;
|
private List<Container> containersToDecrease = null;
|
||||||
private List<SignalContainerRequest> containersToSignal = null;
|
private List<SignalContainerRequest> containersToSignal = null;
|
||||||
|
|
||||||
|
@ -105,6 +109,10 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||||
builder.setNmTokenMasterKey(
|
builder.setNmTokenMasterKey(
|
||||||
convertToProtoFormat(this.nmTokenMasterKey));
|
convertToProtoFormat(this.nmTokenMasterKey));
|
||||||
}
|
}
|
||||||
|
if (this.containerQueuingLimit != null) {
|
||||||
|
builder.setContainerQueuingLimit(
|
||||||
|
convertToProtoFormat(this.containerQueuingLimit));
|
||||||
|
}
|
||||||
if (this.systemCredentials != null) {
|
if (this.systemCredentials != null) {
|
||||||
addSystemCredentialsToProto();
|
addSystemCredentialsToProto();
|
||||||
}
|
}
|
||||||
|
@ -224,6 +232,30 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||||
this.nmTokenMasterKey = masterKey;
|
this.nmTokenMasterKey = masterKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ContainerQueuingLimit getContainerQueuingLimit() {
|
||||||
|
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (this.containerQueuingLimit != null) {
|
||||||
|
return this.containerQueuingLimit;
|
||||||
|
}
|
||||||
|
if (!p.hasContainerQueuingLimit()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.containerQueuingLimit =
|
||||||
|
convertFromProtoFormat(p.getContainerQueuingLimit());
|
||||||
|
return this.containerQueuingLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setContainerQueuingLimit(ContainerQueuingLimit
|
||||||
|
containerQueuingLimit) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (containerQueuingLimit == null) {
|
||||||
|
builder.clearContainerQueuingLimit();
|
||||||
|
}
|
||||||
|
this.containerQueuingLimit = containerQueuingLimit;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NodeAction getNodeAction() {
|
public NodeAction getNodeAction() {
|
||||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
@ -674,6 +706,16 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||||
builder.addAllContainersToSignal(iterable);
|
builder.addAllContainersToSignal(iterable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ContainerQueuingLimit convertFromProtoFormat(
|
||||||
|
ContainerQueuingLimitProto p) {
|
||||||
|
return new ContainerQueuingLimitPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerQueuingLimitProto convertToProtoFormat(
|
||||||
|
ContainerQueuingLimit c) {
|
||||||
|
return ((ContainerQueuingLimitPBImpl)c).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
private SignalContainerRequestPBImpl convertFromProtoFormat(
|
private SignalContainerRequestPBImpl convertFromProtoFormat(
|
||||||
SignalContainerRequestProto p) {
|
SignalContainerRequestProto p) {
|
||||||
return new SignalContainerRequestPBImpl(p);
|
return new SignalContainerRequestPBImpl(p);
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.api.records;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to hold max wait time / queue length information to be
|
||||||
|
* passed back to the NodeManager.
|
||||||
|
*/
|
||||||
|
public abstract class ContainerQueuingLimit {
|
||||||
|
|
||||||
|
public static ContainerQueuingLimit newInstance() {
|
||||||
|
ContainerQueuingLimit containerQueuingLimit =
|
||||||
|
Records.newRecord(ContainerQueuingLimit.class);
|
||||||
|
containerQueuingLimit.setMaxQueueLength(-1);
|
||||||
|
containerQueuingLimit.setMaxQueueWaitTimeInMs(-1);
|
||||||
|
return containerQueuingLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract int getMaxQueueLength();
|
||||||
|
|
||||||
|
public abstract void setMaxQueueLength(int queueLength);
|
||||||
|
|
||||||
|
public abstract int getMaxQueueWaitTimeInMs();
|
||||||
|
|
||||||
|
public abstract void setMaxQueueWaitTimeInMs(int waitTime);
|
||||||
|
}
|
|
@ -0,0 +1,80 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.api.records.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of ContainerQueuingLimit interface.
|
||||||
|
*/
|
||||||
|
public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit {
|
||||||
|
|
||||||
|
private ContainerQueuingLimitProto proto =
|
||||||
|
ContainerQueuingLimitProto.getDefaultInstance();
|
||||||
|
private ContainerQueuingLimitProto.Builder builder = null;
|
||||||
|
private boolean viaProto = false;
|
||||||
|
|
||||||
|
public ContainerQueuingLimitPBImpl() {
|
||||||
|
builder = ContainerQueuingLimitProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
this.viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContainerQueuingLimitProto getProto() {
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = ContainerQueuingLimitProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxQueueWaitTimeInMs() {
|
||||||
|
ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return p.getMaxQueueWaitTimeInMs();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxQueueWaitTimeInMs(int waitTime) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setMaxQueueWaitTimeInMs(waitTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxQueueLength() {
|
||||||
|
ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
return p.getMaxQueueLength();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxQueueLength(int queueLength) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setMaxQueueLength(queueLength);
|
||||||
|
}
|
||||||
|
}
|
|
@ -102,6 +102,12 @@ message NodeHeartbeatResponseProto {
|
||||||
repeated ContainerProto containers_to_decrease = 12;
|
repeated ContainerProto containers_to_decrease = 12;
|
||||||
repeated SignalContainerRequestProto containers_to_signal = 13;
|
repeated SignalContainerRequestProto containers_to_signal = 13;
|
||||||
optional ResourceProto resource = 14;
|
optional ResourceProto resource = 14;
|
||||||
|
optional ContainerQueuingLimitProto container_queuing_limit = 15;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ContainerQueuingLimitProto {
|
||||||
|
optional int32 max_queue_length = 1;
|
||||||
|
optional int32 max_queue_wait_time_in_ms = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SystemCredentialsForAppsProto {
|
message SystemCredentialsForAppsProto {
|
||||||
|
|
|
@ -23,13 +23,13 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
@ -82,7 +82,7 @@ public interface Context {
|
||||||
|
|
||||||
NodeHealthStatus getNodeHealthStatus();
|
NodeHealthStatus getNodeHealthStatus();
|
||||||
|
|
||||||
ContainerManagementProtocol getContainerManager();
|
ContainerManager getContainerManager();
|
||||||
|
|
||||||
NodeResourceMonitor getNodeResourceMonitor();
|
NodeResourceMonitor getNodeResourceMonitor();
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
@ -465,7 +465,7 @@ public class NodeManager extends CompositeService
|
||||||
|
|
||||||
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
private final NMContainerTokenSecretManager containerTokenSecretManager;
|
||||||
private final NMTokenSecretManagerInNM nmTokenSecretManager;
|
private final NMTokenSecretManagerInNM nmTokenSecretManager;
|
||||||
private ContainerManagementProtocol containerManager;
|
private ContainerManager containerManager;
|
||||||
private NodeResourceMonitor nodeResourceMonitor;
|
private NodeResourceMonitor nodeResourceMonitor;
|
||||||
private final LocalDirsHandlerService dirsHandler;
|
private final LocalDirsHandlerService dirsHandler;
|
||||||
private final ApplicationACLsManager aclsManager;
|
private final ApplicationACLsManager aclsManager;
|
||||||
|
@ -555,11 +555,11 @@ public class NodeManager extends CompositeService
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ContainerManagementProtocol getContainerManager() {
|
public ContainerManager getContainerManager() {
|
||||||
return this.containerManager;
|
return this.containerManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setContainerManager(ContainerManagementProtocol containerManager) {
|
public void setContainerManager(ContainerManager containerManager) {
|
||||||
this.containerManager = containerManager;
|
this.containerManager = containerManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -71,13 +71,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||||
|
@ -411,8 +412,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
|
|
||||||
LOG.info(successfullRegistrationMsg);
|
LOG.info(successfullRegistrationMsg);
|
||||||
LOG.info("Notifying ContainerManager to unblock new container-requests");
|
LOG.info("Notifying ContainerManager to unblock new container-requests");
|
||||||
((ContainerManagerImpl) this.context.getContainerManager())
|
this.context.getContainerManager().setBlockNewContainerRequests(false);
|
||||||
.setBlockNewContainerRequests(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ApplicationId> createKeepAliveApplicationList() {
|
private List<ApplicationId> createKeepAliveApplicationList() {
|
||||||
|
@ -475,10 +475,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
* @return Resource utilization of all the containers.
|
* @return Resource utilization of all the containers.
|
||||||
*/
|
*/
|
||||||
private ResourceUtilization getContainersUtilization() {
|
private ResourceUtilization getContainersUtilization() {
|
||||||
ContainerManagerImpl containerManager =
|
|
||||||
(ContainerManagerImpl) this.context.getContainerManager();
|
|
||||||
ContainersMonitor containersMonitor =
|
ContainersMonitor containersMonitor =
|
||||||
containerManager.getContainersMonitor();
|
this.context.getContainerManager().getContainersMonitor();
|
||||||
return containersMonitor.getContainersUtilization();
|
return containersMonitor.getContainersUtilization();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -751,7 +749,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
Set<NodeLabel> nodeLabelsForHeartbeat =
|
Set<NodeLabel> nodeLabelsForHeartbeat =
|
||||||
nodeLabelsHandler.getNodeLabelsForHeartbeat();
|
nodeLabelsHandler.getNodeLabelsForHeartbeat();
|
||||||
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
|
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
|
||||||
|
|
||||||
NodeHeartbeatRequest request =
|
NodeHeartbeatRequest request =
|
||||||
NodeHeartbeatRequest.newInstance(nodeStatus,
|
NodeHeartbeatRequest.newInstance(nodeStatus,
|
||||||
NodeStatusUpdaterImpl.this.context
|
NodeStatusUpdaterImpl.this.context
|
||||||
|
@ -776,33 +773,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
nextHeartBeatInterval = response.getNextHeartBeatInterval();
|
nextHeartBeatInterval = response.getNextHeartBeatInterval();
|
||||||
updateMasterKeys(response);
|
updateMasterKeys(response);
|
||||||
|
|
||||||
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
if (!handleShutdownOrResyncCommand(response)) {
|
||||||
LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of"
|
nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
|
||||||
+ " heartbeat, hence shutting down.");
|
response);
|
||||||
LOG.warn("Message from ResourceManager: "
|
|
||||||
+ response.getDiagnosticsMessage());
|
|
||||||
context.setDecommissioned(true);
|
|
||||||
dispatcher.getEventHandler().handle(
|
|
||||||
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (response.getNodeAction() == NodeAction.RESYNC) {
|
|
||||||
LOG.warn("Node is out of sync with ResourceManager,"
|
|
||||||
+ " hence resyncing.");
|
|
||||||
LOG.warn("Message from ResourceManager: "
|
|
||||||
+ response.getDiagnosticsMessage());
|
|
||||||
// Invalidate the RMIdentifier while resync
|
|
||||||
NodeStatusUpdaterImpl.this.rmIdentifier =
|
|
||||||
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
|
||||||
dispatcher.getEventHandler().handle(
|
|
||||||
new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
|
||||||
pendingCompletedContainers.clear();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response);
|
// Explicitly put this method after checking the resync
|
||||||
|
// response. We
|
||||||
// Explicitly put this method after checking the resync response. We
|
|
||||||
// don't want to remove the completed containers before resync
|
// don't want to remove the completed containers before resync
|
||||||
// because these completed containers will be reported back to RM
|
// because these completed containers will be reported back to RM
|
||||||
// when NM re-registers with RM.
|
// when NM re-registers with RM.
|
||||||
|
@ -817,7 +793,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
if (!containersToCleanup.isEmpty()) {
|
if (!containersToCleanup.isEmpty()) {
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new CMgrCompletedContainersEvent(containersToCleanup,
|
new CMgrCompletedContainersEvent(containersToCleanup,
|
||||||
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
|
CMgrCompletedContainersEvent.Reason
|
||||||
|
.BY_RESOURCEMANAGER));
|
||||||
}
|
}
|
||||||
List<ApplicationId> appsToCleanup =
|
List<ApplicationId> appsToCleanup =
|
||||||
response.getApplicationsToCleanup();
|
response.getApplicationsToCleanup();
|
||||||
|
@ -828,31 +805,39 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
new CMgrCompletedAppsEvent(appsToCleanup,
|
new CMgrCompletedAppsEvent(appsToCleanup,
|
||||||
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<ApplicationId, ByteBuffer> systemCredentials =
|
Map<ApplicationId, ByteBuffer> systemCredentials =
|
||||||
response.getSystemCredentialsForApps();
|
response.getSystemCredentialsForApps();
|
||||||
if (systemCredentials != null && !systemCredentials.isEmpty()) {
|
if (systemCredentials != null && !systemCredentials.isEmpty()) {
|
||||||
((NMContext) context)
|
((NMContext) context).setSystemCrendentialsForApps(
|
||||||
.setSystemCrendentialsForApps(parseCredentials(systemCredentials));
|
parseCredentials(systemCredentials));
|
||||||
}
|
}
|
||||||
|
|
||||||
List<org.apache.hadoop.yarn.api.records.Container>
|
List<org.apache.hadoop.yarn.api.records.Container>
|
||||||
containersToDecrease = response.getContainersToDecrease();
|
containersToDecrease = response.getContainersToDecrease();
|
||||||
if (!containersToDecrease.isEmpty()) {
|
if (!containersToDecrease.isEmpty()) {
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new CMgrDecreaseContainersResourceEvent(containersToDecrease)
|
new CMgrDecreaseContainersResourceEvent(
|
||||||
|
containersToDecrease)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// SignalContainer request originally comes from end users via
|
// SignalContainer request originally comes from end users via
|
||||||
// ClientRMProtocol's SignalContainer. Forward the request to
|
// ClientRMProtocol's SignalContainer. Forward the request to
|
||||||
// ContainerManager which will dispatch the event to ContainerLauncher.
|
// ContainerManager which will dispatch the event to
|
||||||
|
// ContainerLauncher.
|
||||||
List<SignalContainerRequest> containersToSignal = response
|
List<SignalContainerRequest> containersToSignal = response
|
||||||
.getContainersToSignalList();
|
.getContainersToSignalList();
|
||||||
if (containersToSignal.size() != 0) {
|
if (containersToSignal.size() != 0) {
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new CMgrSignalContainersEvent(containersToSignal));
|
new CMgrSignalContainersEvent(containersToSignal));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update QueuingLimits if ContainerManager supports queuing
|
||||||
|
ContainerQueuingLimit queuingLimit =
|
||||||
|
response.getContainerQueuingLimit();
|
||||||
|
if (queuingLimit != null) {
|
||||||
|
context.getContainerManager().updateQueuingLimit(queuingLimit);
|
||||||
|
}
|
||||||
|
}
|
||||||
// Handling node resource update case.
|
// Handling node resource update case.
|
||||||
Resource newResource = response.getResource();
|
Resource newResource = response.getResource();
|
||||||
if (newResource != null) {
|
if (newResource != null) {
|
||||||
|
@ -908,6 +893,34 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
statusUpdater.start();
|
statusUpdater.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean handleShutdownOrResyncCommand(
|
||||||
|
NodeHeartbeatResponse response) {
|
||||||
|
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
||||||
|
LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of"
|
||||||
|
+ " heartbeat, hence shutting down.");
|
||||||
|
LOG.warn("Message from ResourceManager: "
|
||||||
|
+ response.getDiagnosticsMessage());
|
||||||
|
context.setDecommissioned(true);
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (response.getNodeAction() == NodeAction.RESYNC) {
|
||||||
|
LOG.warn("Node is out of sync with ResourceManager,"
|
||||||
|
+ " hence resyncing.");
|
||||||
|
LOG.warn("Message from ResourceManager: "
|
||||||
|
+ response.getDiagnosticsMessage());
|
||||||
|
// Invalidate the RMIdentifier while resync
|
||||||
|
NodeStatusUpdaterImpl.this.rmIdentifier =
|
||||||
|
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
||||||
|
dispatcher.getEventHandler().handle(
|
||||||
|
new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
||||||
|
pendingCompletedContainers.clear();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private List<LogAggregationReport> getLogAggregationReportsForApps(
|
private List<LogAggregationReport> getLogAggregationReportsForApps(
|
||||||
ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
|
ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
|
||||||
LogAggregationReport status;
|
LogAggregationReport status;
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.service.ServiceStateChangeListener;
|
||||||
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
|
||||||
|
.ContainersMonitor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The ContainerManager is an entity that manages the life cycle of Containers.
|
||||||
|
*/
|
||||||
|
public interface ContainerManager extends ServiceStateChangeListener,
|
||||||
|
ContainerManagementProtocol,
|
||||||
|
EventHandler<ContainerManagerEvent> {
|
||||||
|
|
||||||
|
ContainersMonitor getContainersMonitor();
|
||||||
|
|
||||||
|
void updateQueuingLimit(ContainerQueuingLimit queuingLimit);
|
||||||
|
|
||||||
|
void setBlockNewContainerRequests(boolean blockNewContainerRequests);
|
||||||
|
|
||||||
|
}
|
|
@ -53,7 +53,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.service.ServiceStateChangeListener;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||||
|
@ -95,6 +94,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Containe
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
||||||
|
@ -150,8 +150,7 @@ import com.google.protobuf.ByteString;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
public class ContainerManagerImpl extends CompositeService implements
|
public class ContainerManagerImpl extends CompositeService implements
|
||||||
ServiceStateChangeListener, ContainerManagementProtocol,
|
ContainerManager {
|
||||||
EventHandler<ContainerManagerEvent> {
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extra duration to wait for applications to be killed on shutdown.
|
* Extra duration to wait for applications to be killed on shutdown.
|
||||||
|
@ -410,6 +409,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ContainersMonitor getContainersMonitor() {
|
public ContainersMonitor getContainersMonitor() {
|
||||||
return this.containersMonitor;
|
return this.containersMonitor;
|
||||||
}
|
}
|
||||||
|
@ -1398,6 +1398,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
||||||
this.blockNewContainerRequests.set(blockNewContainerRequests);
|
this.blockNewContainerRequests.set(blockNewContainerRequests);
|
||||||
}
|
}
|
||||||
|
@ -1434,4 +1435,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
protected boolean isServiceStopped() {
|
protected boolean isServiceStopped() {
|
||||||
return serviceStopped;
|
return serviceStopped;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) {
|
||||||
|
LOG.trace("Implementation does not support queuing of Containers !!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
@ -83,6 +84,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||||
private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
|
private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
|
||||||
|
|
||||||
private Set<ContainerId> opportunisticContainersToKill;
|
private Set<ContainerId> opportunisticContainersToKill;
|
||||||
|
private final ContainerQueuingLimit queuingLimit;
|
||||||
|
|
||||||
public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
|
public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||||
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
||||||
|
@ -95,6 +97,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||||
this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
|
this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
|
||||||
this.opportunisticContainersToKill = Collections.synchronizedSet(
|
this.opportunisticContainersToKill = Collections.synchronizedSet(
|
||||||
new HashSet<ContainerId>());
|
new HashSet<ContainerId>());
|
||||||
|
this.queuingLimit = ContainerQueuingLimit.newInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -468,7 +471,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||||
+ "will be added to the queued containers.");
|
+ "will be added to the queued containers.");
|
||||||
|
|
||||||
AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
|
AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
|
||||||
token, null, rcs.getStartRequest(), token.getExecutionType(),
|
token, rcs.getStartRequest(), token.getExecutionType(),
|
||||||
token.getResource(), getConfig());
|
token.getResource(), getConfig());
|
||||||
|
|
||||||
this.context.getQueuingContext().getQueuedContainers().put(
|
this.context.getQueuingContext().getQueuedContainers().put(
|
||||||
|
@ -526,6 +529,41 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateQueuingLimit(ContainerQueuingLimit limit) {
|
||||||
|
this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
|
||||||
|
// TODO: Include wait time as well once it is implemented
|
||||||
|
if (this.queuingLimit.getMaxQueueLength() > -1) {
|
||||||
|
shedQueuedOpportunisticContainers();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shedQueuedOpportunisticContainers() {
|
||||||
|
int numAllowed = this.queuingLimit.getMaxQueueLength();
|
||||||
|
Iterator<AllocatedContainerInfo> containerIter =
|
||||||
|
queuedOpportunisticContainers.iterator();
|
||||||
|
while (containerIter.hasNext()) {
|
||||||
|
AllocatedContainerInfo cInfo = containerIter.next();
|
||||||
|
if (numAllowed <= 0) {
|
||||||
|
containerIter.remove();
|
||||||
|
ContainerTokenIdentifier containerTokenIdentifier = this.context
|
||||||
|
.getQueuingContext().getQueuedContainers().remove(
|
||||||
|
cInfo.getContainerTokenIdentifier().getContainerID());
|
||||||
|
// The Container might have already started while we were
|
||||||
|
// iterating..
|
||||||
|
if (containerTokenIdentifier != null) {
|
||||||
|
this.context.getQueuingContext().getKilledQueuedContainers()
|
||||||
|
.putIfAbsent(cInfo.getContainerTokenIdentifier(),
|
||||||
|
"Container De-queued to meet global queuing limits. "
|
||||||
|
+ "Max Queue length["
|
||||||
|
+ this.queuingLimit.getMaxQueueLength() + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numAllowed--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static class AllocatedContainerInfo {
|
static class AllocatedContainerInfo {
|
||||||
private final ContainerTokenIdentifier containerTokenIdentifier;
|
private final ContainerTokenIdentifier containerTokenIdentifier;
|
||||||
private final StartContainerRequest startRequest;
|
private final StartContainerRequest startRequest;
|
||||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
@ -642,7 +643,7 @@ public abstract class BaseAMRMProxyTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ContainerManagementProtocol getContainerManager() {
|
public ContainerManager getContainerManager() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,13 +24,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementations of this class are notified of changes to the cluster's state,
|
||||||
|
* such as node addition, removal and updates.
|
||||||
|
*/
|
||||||
public interface ClusterMonitor {
|
public interface ClusterMonitor {
|
||||||
|
|
||||||
void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
|
void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
|
||||||
|
|
||||||
void removeNode(RMNode removedRMNode);
|
void removeNode(RMNode removedRMNode);
|
||||||
|
|
||||||
void nodeUpdate(RMNode rmNode);
|
void updateNode(RMNode rmNode);
|
||||||
|
|
||||||
void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
|
void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,8 +46,10 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
|
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
|
||||||
.TopKNodeSelector;
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
||||||
|
@ -57,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -76,30 +77,64 @@ public class DistributedSchedulingService extends ApplicationMasterService
|
||||||
private static final Log LOG =
|
private static final Log LOG =
|
||||||
LogFactory.getLog(DistributedSchedulingService.class);
|
LogFactory.getLog(DistributedSchedulingService.class);
|
||||||
|
|
||||||
private final TopKNodeSelector clusterMonitor;
|
private final NodeQueueLoadMonitor nodeMonitor;
|
||||||
|
|
||||||
private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
|
private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
|
private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
|
private final int k;
|
||||||
|
|
||||||
public DistributedSchedulingService(RMContext rmContext,
|
public DistributedSchedulingService(RMContext rmContext,
|
||||||
YarnScheduler scheduler) {
|
YarnScheduler scheduler) {
|
||||||
super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
|
super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
|
||||||
int k = rmContext.getYarnConfiguration().getInt(
|
this.k = rmContext.getYarnConfiguration().getInt(
|
||||||
YarnConfiguration.DIST_SCHEDULING_TOP_K,
|
YarnConfiguration.DIST_SCHEDULING_TOP_K,
|
||||||
YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
|
YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
|
||||||
long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
|
long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
|
||||||
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
|
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
|
||||||
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
|
YarnConfiguration.
|
||||||
TopKNodeSelector.TopKComparator comparator =
|
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
|
||||||
TopKNodeSelector.TopKComparator.valueOf(
|
NodeQueueLoadMonitor.LoadComparator comparator =
|
||||||
|
NodeQueueLoadMonitor.LoadComparator.valueOf(
|
||||||
rmContext.getYarnConfiguration().get(
|
rmContext.getYarnConfiguration().get(
|
||||||
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
|
YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
|
||||||
YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
|
YarnConfiguration.
|
||||||
TopKNodeSelector topKSelector =
|
NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
|
||||||
new TopKNodeSelector(k, topKComputationInterval, comparator);
|
|
||||||
this.clusterMonitor = topKSelector;
|
NodeQueueLoadMonitor topKSelector =
|
||||||
|
new NodeQueueLoadMonitor(nodeSortInterval, comparator);
|
||||||
|
|
||||||
|
float sigma = rmContext.getYarnConfiguration()
|
||||||
|
.getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
|
||||||
|
YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
|
||||||
|
|
||||||
|
int limitMin, limitMax;
|
||||||
|
|
||||||
|
if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
|
||||||
|
limitMin = rmContext.getYarnConfiguration()
|
||||||
|
.getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
|
||||||
|
YarnConfiguration.
|
||||||
|
NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
|
||||||
|
limitMax = rmContext.getYarnConfiguration()
|
||||||
|
.getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
|
||||||
|
YarnConfiguration.
|
||||||
|
NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
|
||||||
|
} else {
|
||||||
|
limitMin = rmContext.getYarnConfiguration()
|
||||||
|
.getInt(
|
||||||
|
YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
|
||||||
|
YarnConfiguration.
|
||||||
|
NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
|
||||||
|
limitMax = rmContext.getYarnConfiguration()
|
||||||
|
.getInt(
|
||||||
|
YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
|
||||||
|
YarnConfiguration.
|
||||||
|
NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
|
||||||
|
this.nodeMonitor = topKSelector;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -189,7 +224,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
|
||||||
|
|
||||||
// Set nodes to be used for scheduling
|
// Set nodes to be used for scheduling
|
||||||
dsResp.setNodesForScheduling(
|
dsResp.setNodesForScheduling(
|
||||||
new ArrayList<>(this.clusterMonitor.selectNodes()));
|
this.nodeMonitor.selectLeastLoadedNodes(this.k));
|
||||||
return dsResp;
|
return dsResp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,7 +236,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
|
||||||
(DistSchedAllocateResponse.class);
|
(DistSchedAllocateResponse.class);
|
||||||
dsResp.setAllocateResponse(response);
|
dsResp.setAllocateResponse(response);
|
||||||
dsResp.setNodesForScheduling(
|
dsResp.setNodesForScheduling(
|
||||||
new ArrayList<>(this.clusterMonitor.selectNodes()));
|
this.nodeMonitor.selectLeastLoadedNodes(this.k));
|
||||||
return dsResp;
|
return dsResp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,8 +268,8 @@ public class DistributedSchedulingService extends ApplicationMasterService
|
||||||
if (!(event instanceof NodeAddedSchedulerEvent)) {
|
if (!(event instanceof NodeAddedSchedulerEvent)) {
|
||||||
throw new RuntimeException("Unexpected event type: " + event);
|
throw new RuntimeException("Unexpected event type: " + event);
|
||||||
}
|
}
|
||||||
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
|
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
|
||||||
clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
|
nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
|
||||||
nodeAddedEvent.getAddedRMNode());
|
nodeAddedEvent.getAddedRMNode());
|
||||||
addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
|
addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
|
||||||
nodeAddedEvent.getAddedRMNode().getNodeID());
|
nodeAddedEvent.getAddedRMNode().getNodeID());
|
||||||
|
@ -246,8 +281,8 @@ public class DistributedSchedulingService extends ApplicationMasterService
|
||||||
throw new RuntimeException("Unexpected event type: " + event);
|
throw new RuntimeException("Unexpected event type: " + event);
|
||||||
}
|
}
|
||||||
NodeRemovedSchedulerEvent nodeRemovedEvent =
|
NodeRemovedSchedulerEvent nodeRemovedEvent =
|
||||||
(NodeRemovedSchedulerEvent)event;
|
(NodeRemovedSchedulerEvent) event;
|
||||||
clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
|
nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
|
||||||
removeFromMapping(rackToNode,
|
removeFromMapping(rackToNode,
|
||||||
nodeRemovedEvent.getRemovedRMNode().getRackName(),
|
nodeRemovedEvent.getRemovedRMNode().getRackName(),
|
||||||
nodeRemovedEvent.getRemovedRMNode().getNodeID());
|
nodeRemovedEvent.getRemovedRMNode().getNodeID());
|
||||||
|
@ -259,16 +294,17 @@ public class DistributedSchedulingService extends ApplicationMasterService
|
||||||
if (!(event instanceof NodeUpdateSchedulerEvent)) {
|
if (!(event instanceof NodeUpdateSchedulerEvent)) {
|
||||||
throw new RuntimeException("Unexpected event type: " + event);
|
throw new RuntimeException("Unexpected event type: " + event);
|
||||||
}
|
}
|
||||||
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
|
||||||
clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
|
event;
|
||||||
|
nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
|
||||||
break;
|
break;
|
||||||
case NODE_RESOURCE_UPDATE:
|
case NODE_RESOURCE_UPDATE:
|
||||||
if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
|
if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
|
||||||
throw new RuntimeException("Unexpected event type: " + event);
|
throw new RuntimeException("Unexpected event type: " + event);
|
||||||
}
|
}
|
||||||
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
|
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
|
||||||
(NodeResourceUpdateSchedulerEvent)event;
|
(NodeResourceUpdateSchedulerEvent) event;
|
||||||
clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
|
nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
|
||||||
nodeResourceUpdatedEvent.getResourceOption());
|
nodeResourceUpdatedEvent.getResourceOption());
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -290,6 +326,10 @@ public class DistributedSchedulingService extends ApplicationMasterService
|
||||||
LOG.error("Unknown event arrived at DistributedSchedulingService: "
|
LOG.error("Unknown event arrived at DistributedSchedulingService: "
|
||||||
+ event.toString());
|
+ event.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
|
||||||
|
return nodeMonitor.getThresholdCalculator();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
|
@ -139,4 +141,6 @@ public interface RMContext {
|
||||||
void setLeaderElectorService(LeaderElectorService elector);
|
void setLeaderElectorService(LeaderElectorService elector);
|
||||||
|
|
||||||
LeaderElectorService getLeaderElectorService();
|
LeaderElectorService getLeaderElectorService();
|
||||||
|
|
||||||
|
QueueLimitCalculator getNodeManagerQueueLimitCalculator();
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||||
|
@ -74,6 +76,8 @@ public class RMContextImpl implements RMContext {
|
||||||
private SystemMetricsPublisher systemMetricsPublisher;
|
private SystemMetricsPublisher systemMetricsPublisher;
|
||||||
private LeaderElectorService elector;
|
private LeaderElectorService elector;
|
||||||
|
|
||||||
|
private QueueLimitCalculator queueLimitCalculator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor. To be used in conjunction with setter methods for
|
* Default constructor. To be used in conjunction with setter methods for
|
||||||
* individual fields.
|
* individual fields.
|
||||||
|
@ -472,4 +476,14 @@ public class RMContextImpl implements RMContext {
|
||||||
public void setQueuePlacementManager(PlacementManager placementMgr) {
|
public void setQueuePlacementManager(PlacementManager placementMgr) {
|
||||||
this.activeServiceContext.setQueuePlacementManager(placementMgr);
|
this.activeServiceContext.setQueuePlacementManager(placementMgr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
|
||||||
|
return this.queueLimitCalculator;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setContainerQueueLimitCalculator(
|
||||||
|
QueueLimitCalculator limitCalculator) {
|
||||||
|
this.queueLimitCalculator = limitCalculator;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1154,6 +1154,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
addService(distSchedulerEventDispatcher);
|
addService(distSchedulerEventDispatcher);
|
||||||
rmDispatcher.register(SchedulerEventType.class,
|
rmDispatcher.register(SchedulerEventType.class,
|
||||||
distSchedulerEventDispatcher);
|
distSchedulerEventDispatcher);
|
||||||
|
this.rmContext.setContainerQueueLimitCalculator(
|
||||||
|
distributedSchedulingService.getNodeManagerQueueLimitCalculator());
|
||||||
return distributedSchedulingService;
|
return distributedSchedulingService;
|
||||||
}
|
}
|
||||||
return new ApplicationMasterService(this.rmContext, scheduler);
|
return new ApplicationMasterService(this.rmContext, scheduler);
|
||||||
|
|
|
@ -563,6 +563,13 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
nodeHeartBeatResponse.setResource(capability);
|
nodeHeartBeatResponse.setResource(capability);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 7. Send Container Queuing Limits back to the Node. This will be used by
|
||||||
|
// the node to truncate the number of Containers queued for execution.
|
||||||
|
if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) {
|
||||||
|
nodeHeartBeatResponse.setContainerQueuingLimit(
|
||||||
|
this.rmContext.getNodeManagerQueueLimitCalculator()
|
||||||
|
.createContainerQueuingLimit());
|
||||||
|
}
|
||||||
return nodeHeartBeatResponse;
|
return nodeHeartBeatResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,36 +31,47 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class TopKNodeSelector implements ClusterMonitor {
|
/**
|
||||||
|
* The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
|
||||||
|
* and total wait time) associated with Container Queues on the Node Manager.
|
||||||
|
* It uses this information to periodically sort the Nodes from least to most
|
||||||
|
* loaded.
|
||||||
|
*/
|
||||||
|
public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||||
|
|
||||||
final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
|
final static Log LOG = LogFactory.getLog(NodeQueueLoadMonitor.class);
|
||||||
|
|
||||||
public enum TopKComparator implements Comparator<ClusterNode> {
|
/**
|
||||||
WAIT_TIME,
|
* The comparator used to specify the metric against which the load
|
||||||
QUEUE_LENGTH;
|
* of two Nodes are compared.
|
||||||
|
*/
|
||||||
|
public enum LoadComparator implements Comparator<ClusterNode> {
|
||||||
|
QUEUE_LENGTH,
|
||||||
|
QUEUE_WAIT_TIME;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compare(ClusterNode o1, ClusterNode o2) {
|
public int compare(ClusterNode o1, ClusterNode o2) {
|
||||||
if (getQuant(o1) == getQuant(o2)) {
|
if (getMetric(o1) == getMetric(o2)) {
|
||||||
return o1.timestamp < o2.timestamp ? +1 : -1;
|
return o1.timestamp < o2.timestamp ? +1 : -1;
|
||||||
}
|
}
|
||||||
return getQuant(o1) > getQuant(o2) ? +1 : -1;
|
return getMetric(o1) > getMetric(o2) ? +1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getQuant(ClusterNode c) {
|
public int getMetric(ClusterNode c) {
|
||||||
return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
|
return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class ClusterNode {
|
static class ClusterNode {
|
||||||
int queueTime = -1;
|
int queueLength = 0;
|
||||||
int waitQueueLength = 0;
|
int queueWaitTime = -1;
|
||||||
double timestamp;
|
double timestamp;
|
||||||
final NodeId nodeId;
|
final NodeId nodeId;
|
||||||
|
|
||||||
|
@ -69,13 +80,13 @@ public class TopKNodeSelector implements ClusterMonitor {
|
||||||
updateTimestamp();
|
updateTimestamp();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClusterNode setQueueTime(int queueTime) {
|
public ClusterNode setQueueLength(int qLength) {
|
||||||
this.queueTime = queueTime;
|
this.queueLength = qLength;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClusterNode setWaitQueueLength(int queueLength) {
|
public ClusterNode setQueueWaitTime(int wTime) {
|
||||||
this.waitQueueLength = queueLength;
|
this.queueWaitTime = wTime;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,34 +96,37 @@ public class TopKNodeSelector implements ClusterMonitor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final int k;
|
|
||||||
private final List<NodeId> topKNodes;
|
|
||||||
private final ScheduledExecutorService scheduledExecutor;
|
private final ScheduledExecutorService scheduledExecutor;
|
||||||
private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
|
|
||||||
private final Comparator<ClusterNode> comparator;
|
private final List<NodeId> sortedNodes;
|
||||||
|
private final Map<NodeId, ClusterNode> clusterNodes =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
private final LoadComparator comparator;
|
||||||
|
private QueueLimitCalculator thresholdCalculator;
|
||||||
|
|
||||||
Runnable computeTask = new Runnable() {
|
Runnable computeTask = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
synchronized (topKNodes) {
|
synchronized (sortedNodes) {
|
||||||
topKNodes.clear();
|
sortedNodes.clear();
|
||||||
topKNodes.addAll(computeTopKNodes());
|
sortedNodes.addAll(sortNodes());
|
||||||
|
if (thresholdCalculator != null) {
|
||||||
|
thresholdCalculator.update();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
TopKNodeSelector(int k, TopKComparator comparator) {
|
NodeQueueLoadMonitor(LoadComparator comparator) {
|
||||||
this.k = k;
|
this.sortedNodes = new ArrayList<>();
|
||||||
this.topKNodes = new ArrayList<>();
|
|
||||||
this.comparator = comparator;
|
this.comparator = comparator;
|
||||||
this.scheduledExecutor = null;
|
this.scheduledExecutor = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TopKNodeSelector(int k, long nodeComputationInterval,
|
public NodeQueueLoadMonitor(long nodeComputationInterval,
|
||||||
TopKComparator comparator) {
|
LoadComparator comparator) {
|
||||||
this.k = k;
|
this.sortedNodes = new ArrayList<>();
|
||||||
this.topKNodes = new ArrayList<>();
|
|
||||||
this.scheduledExecutor = Executors.newScheduledThreadPool(1);
|
this.scheduledExecutor = Executors.newScheduledThreadPool(1);
|
||||||
this.comparator = comparator;
|
this.comparator = comparator;
|
||||||
this.scheduledExecutor.scheduleAtFixedRate(computeTask,
|
this.scheduledExecutor.scheduleAtFixedRate(computeTask,
|
||||||
|
@ -120,12 +134,32 @@ public class TopKNodeSelector implements ClusterMonitor {
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<NodeId> getSortedNodes() {
|
||||||
|
return sortedNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public QueueLimitCalculator getThresholdCalculator() {
|
||||||
|
return thresholdCalculator;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<NodeId, ClusterNode> getClusterNodes() {
|
||||||
|
return clusterNodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
Comparator<ClusterNode> getComparator() {
|
||||||
|
return comparator;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initThresholdCalculator(float sigma, int limitMin, int limitMax) {
|
||||||
|
this.thresholdCalculator =
|
||||||
|
new QueueLimitCalculator(this, sigma, limitMin, limitMax);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addNode(List<NMContainerStatus> containerStatuses, RMNode
|
public void addNode(List<NMContainerStatus> containerStatuses, RMNode
|
||||||
rmNode) {
|
rmNode) {
|
||||||
LOG.debug("Node added event from: " + rmNode.getNode().getName());
|
LOG.debug("Node added event from: " + rmNode.getNode().getName());
|
||||||
// Ignoring this currently : atleast one NODE_UPDATE heartbeat is
|
// Ignoring this currently : at least one NODE_UPDATE heartbeat is
|
||||||
// required to ensure node eligibility.
|
// required to ensure node eligibility.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,24 +177,24 @@ public class TopKNodeSelector implements ClusterMonitor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void nodeUpdate(RMNode rmNode) {
|
public void updateNode(RMNode rmNode) {
|
||||||
LOG.debug("Node update event from: " + rmNode.getNodeID());
|
LOG.debug("Node update event from: " + rmNode.getNodeID());
|
||||||
QueuedContainersStatus queuedContainersStatus =
|
QueuedContainersStatus queuedContainersStatus =
|
||||||
rmNode.getQueuedContainersStatus();
|
rmNode.getQueuedContainersStatus();
|
||||||
int estimatedQueueWaitTime =
|
int estimatedQueueWaitTime =
|
||||||
queuedContainersStatus.getEstimatedQueueWaitTime();
|
queuedContainersStatus.getEstimatedQueueWaitTime();
|
||||||
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
|
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
|
||||||
// Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
|
// Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
|
||||||
// UNLESS comparator is based on queue length, in which case, we should add
|
// UNLESS comparator is based on queue length.
|
||||||
synchronized (this.clusterNodes) {
|
synchronized (this.clusterNodes) {
|
||||||
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
|
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
|
||||||
if (currentNode == null) {
|
if (currentNode == null) {
|
||||||
if (estimatedQueueWaitTime != -1
|
if (estimatedQueueWaitTime != -1
|
||||||
|| comparator == TopKComparator.QUEUE_LENGTH) {
|
|| comparator == LoadComparator.QUEUE_LENGTH) {
|
||||||
this.clusterNodes.put(rmNode.getNodeID(),
|
this.clusterNodes.put(rmNode.getNodeID(),
|
||||||
new ClusterNode(rmNode.getNodeID())
|
new ClusterNode(rmNode.getNodeID())
|
||||||
.setQueueTime(estimatedQueueWaitTime)
|
.setQueueWaitTime(estimatedQueueWaitTime)
|
||||||
.setWaitQueueLength(waitQueueLength));
|
.setQueueLength(waitQueueLength));
|
||||||
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
|
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
|
||||||
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
|
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
|
||||||
"wait queue length [" + waitQueueLength + "]");
|
"wait queue length [" + waitQueueLength + "]");
|
||||||
|
@ -171,10 +205,10 @@ public class TopKNodeSelector implements ClusterMonitor {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (estimatedQueueWaitTime != -1
|
if (estimatedQueueWaitTime != -1
|
||||||
|| comparator == TopKComparator.QUEUE_LENGTH) {
|
|| comparator == LoadComparator.QUEUE_LENGTH) {
|
||||||
currentNode
|
currentNode
|
||||||
.setQueueTime(estimatedQueueWaitTime)
|
.setQueueWaitTime(estimatedQueueWaitTime)
|
||||||
.setWaitQueueLength(waitQueueLength)
|
.setQueueLength(waitQueueLength)
|
||||||
.updateTimestamp();
|
.updateTimestamp();
|
||||||
LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
|
LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
|
||||||
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
|
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
|
||||||
|
@ -182,8 +216,8 @@ public class TopKNodeSelector implements ClusterMonitor {
|
||||||
} else {
|
} else {
|
||||||
this.clusterNodes.remove(rmNode.getNodeID());
|
this.clusterNodes.remove(rmNode.getNodeID());
|
||||||
LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
|
LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
|
||||||
"with queue wait time [" + currentNode.queueTime + "] and " +
|
"with queue wait time [" + currentNode.queueWaitTime + "] and " +
|
||||||
"wait queue length [" + currentNode.waitQueueLength + "]");
|
"wait queue length [" + currentNode.queueLength + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -192,25 +226,38 @@ public class TopKNodeSelector implements ClusterMonitor {
|
||||||
@Override
|
@Override
|
||||||
public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
|
public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
|
||||||
LOG.debug("Node resource update event from: " + rmNode.getNodeID());
|
LOG.debug("Node resource update event from: " + rmNode.getNodeID());
|
||||||
// Ignoring this currently...
|
// Ignoring this currently.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns all Node Ids as ordered list from Least to Most Loaded.
|
||||||
|
* @return ordered list of nodes
|
||||||
|
*/
|
||||||
public List<NodeId> selectNodes() {
|
public List<NodeId> selectNodes() {
|
||||||
synchronized (this.topKNodes) {
|
return selectLeastLoadedNodes(-1);
|
||||||
return this.k < this.topKNodes.size() ?
|
}
|
||||||
new ArrayList<>(this.topKNodes).subList(0, this.k) :
|
|
||||||
new ArrayList<>(this.topKNodes);
|
/**
|
||||||
|
* Returns 'K' of the least Loaded Node Ids as ordered list.
|
||||||
|
* @param k max number of nodes to return
|
||||||
|
* @return ordered list of nodes
|
||||||
|
*/
|
||||||
|
public List<NodeId> selectLeastLoadedNodes(int k) {
|
||||||
|
synchronized (this.sortedNodes) {
|
||||||
|
return ((k < this.sortedNodes.size()) && (k >= 0)) ?
|
||||||
|
new ArrayList<>(this.sortedNodes).subList(0, k) :
|
||||||
|
new ArrayList<>(this.sortedNodes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<NodeId> computeTopKNodes() {
|
private List<NodeId> sortNodes() {
|
||||||
synchronized (this.clusterNodes) {
|
synchronized (this.clusterNodes) {
|
||||||
ArrayList aList = new ArrayList<>(this.clusterNodes.values());
|
ArrayList aList = new ArrayList<>(this.clusterNodes.values());
|
||||||
List<NodeId> retList = new ArrayList<>();
|
List<NodeId> retList = new ArrayList<>();
|
||||||
Object[] nodes = aList.toArray();
|
Object[] nodes = aList.toArray();
|
||||||
// Collections.sort would do something similar by calling Arrays.sort
|
// Collections.sort would do something similar by calling Arrays.sort
|
||||||
// internally but would finally iterate through the input list (aList)
|
// internally but would finally iterate through the input list (aList)
|
||||||
// to reset the value of each element.. Since we don't really care about
|
// to reset the value of each element. Since we don't really care about
|
||||||
// 'aList', we can use the iteration to create the list of nodeIds which
|
// 'aList', we can use the iteration to create the list of nodeIds which
|
||||||
// is what we ultimately care about.
|
// is what we ultimately care about.
|
||||||
Arrays.sort(nodes, (Comparator)comparator);
|
Arrays.sort(nodes, (Comparator)comparator);
|
||||||
|
@ -220,4 +267,5 @@ public class TopKNodeSelector implements ClusterMonitor {
|
||||||
return retList;
|
return retList;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,125 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.ClusterNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.LoadComparator;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class interacts with the NodeQueueLoadMonitor to keep track of the
|
||||||
|
* mean and standard deviation of the configured metrics (queue length or queue
|
||||||
|
* wait time) used to characterize the queue load of a specific node.
|
||||||
|
* The NodeQueueLoadMonitor triggers an update (by calling the
|
||||||
|
* <code>update()</code> method) every time it performs a re-ordering of
|
||||||
|
* all nodes.
|
||||||
|
*/
|
||||||
|
public class QueueLimitCalculator {
|
||||||
|
|
||||||
|
class Stats {
|
||||||
|
private final AtomicInteger mean = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger stdev = new AtomicInteger(0);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Not thread safe. Caller should synchronize on sorted nodes list.
|
||||||
|
*/
|
||||||
|
void update() {
|
||||||
|
List<NodeId> sortedNodes = nodeSelector.getSortedNodes();
|
||||||
|
if (sortedNodes.size() > 0) {
|
||||||
|
// Calculate mean
|
||||||
|
int sum = 0;
|
||||||
|
for (NodeId n : sortedNodes) {
|
||||||
|
sum += getMetric(getNode(n));
|
||||||
|
}
|
||||||
|
mean.set(sum / sortedNodes.size());
|
||||||
|
|
||||||
|
// Calculate stdev
|
||||||
|
int sqrSumMean = 0;
|
||||||
|
for (NodeId n : sortedNodes) {
|
||||||
|
int val = getMetric(getNode(n));
|
||||||
|
sqrSumMean += Math.pow(val - mean.get(), 2);
|
||||||
|
}
|
||||||
|
stdev.set(
|
||||||
|
(int) Math.round(Math.sqrt(
|
||||||
|
sqrSumMean / (float) sortedNodes.size())));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClusterNode getNode(NodeId nId) {
|
||||||
|
return nodeSelector.getClusterNodes().get(nId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getMetric(ClusterNode cn) {
|
||||||
|
return (cn != null) ? ((LoadComparator)nodeSelector.getComparator())
|
||||||
|
.getMetric(cn) : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMean() {
|
||||||
|
return mean.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getStdev() {
|
||||||
|
return stdev.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final NodeQueueLoadMonitor nodeSelector;
|
||||||
|
private final float sigma;
|
||||||
|
private final int rangeMin;
|
||||||
|
private final int rangeMax;
|
||||||
|
private final Stats stats = new Stats();
|
||||||
|
|
||||||
|
QueueLimitCalculator(NodeQueueLoadMonitor selector, float sigma,
|
||||||
|
int rangeMin, int rangeMax) {
|
||||||
|
this.nodeSelector = selector;
|
||||||
|
this.sigma = sigma;
|
||||||
|
this.rangeMax = rangeMax;
|
||||||
|
this.rangeMin = rangeMin;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int determineThreshold() {
|
||||||
|
return (int) (stats.getMean() + sigma * stats.getStdev());
|
||||||
|
}
|
||||||
|
|
||||||
|
void update() {
|
||||||
|
this.stats.update();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getThreshold() {
|
||||||
|
int thres = determineThreshold();
|
||||||
|
return Math.min(rangeMax, Math.max(rangeMin, thres));
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContainerQueuingLimit createContainerQueuingLimit() {
|
||||||
|
ContainerQueuingLimit containerQueuingLimit =
|
||||||
|
ContainerQueuingLimit.newInstance();
|
||||||
|
if (nodeSelector.getComparator() == LoadComparator.QUEUE_WAIT_TIME) {
|
||||||
|
containerQueuingLimit.setMaxQueueWaitTimeInMs(getThreshold());
|
||||||
|
containerQueuingLimit.setMaxQueueLength(-1);
|
||||||
|
} else {
|
||||||
|
containerQueuingLimit.setMaxQueueWaitTimeInMs(-1);
|
||||||
|
containerQueuingLimit.setMaxQueueLength(getThreshold());
|
||||||
|
}
|
||||||
|
return containerQueuingLimit;
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -27,7 +28,10 @@ import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class TestTopKNodeSelector {
|
/**
|
||||||
|
* Unit tests for NodeQueueLoadMonitor.
|
||||||
|
*/
|
||||||
|
public class TestNodeQueueLoadMonitor {
|
||||||
|
|
||||||
static class FakeNodeId extends NodeId {
|
static class FakeNodeId extends NodeId {
|
||||||
final String host;
|
final String host;
|
||||||
|
@ -62,12 +66,12 @@ public class TestTopKNodeSelector {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueTimeSort() {
|
public void testWaitTimeSort() {
|
||||||
TopKNodeSelector selector = new TopKNodeSelector(5,
|
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
|
||||||
TopKNodeSelector.TopKComparator.WAIT_TIME);
|
NodeQueueLoadMonitor.LoadComparator.QUEUE_WAIT_TIME);
|
||||||
selector.nodeUpdate(createRMNode("h1", 1, 15, 10));
|
selector.updateNode(createRMNode("h1", 1, 15, 10));
|
||||||
selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
|
selector.updateNode(createRMNode("h2", 2, 5, 10));
|
||||||
selector.nodeUpdate(createRMNode("h3", 3, 10, 10));
|
selector.updateNode(createRMNode("h3", 3, 10, 10));
|
||||||
selector.computeTask.run();
|
selector.computeTask.run();
|
||||||
List<NodeId> nodeIds = selector.selectNodes();
|
List<NodeId> nodeIds = selector.selectNodes();
|
||||||
System.out.println("1-> " + nodeIds);
|
System.out.println("1-> " + nodeIds);
|
||||||
|
@ -76,7 +80,7 @@ public class TestTopKNodeSelector {
|
||||||
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
|
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
|
||||||
|
|
||||||
// Now update node3
|
// Now update node3
|
||||||
selector.nodeUpdate(createRMNode("h3", 3, 2, 10));
|
selector.updateNode(createRMNode("h3", 3, 2, 10));
|
||||||
selector.computeTask.run();
|
selector.computeTask.run();
|
||||||
nodeIds = selector.selectNodes();
|
nodeIds = selector.selectNodes();
|
||||||
System.out.println("2-> "+ nodeIds);
|
System.out.println("2-> "+ nodeIds);
|
||||||
|
@ -85,7 +89,7 @@ public class TestTopKNodeSelector {
|
||||||
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
|
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
|
||||||
|
|
||||||
// Now send update with -1 wait time
|
// Now send update with -1 wait time
|
||||||
selector.nodeUpdate(createRMNode("h4", 4, -1, 10));
|
selector.updateNode(createRMNode("h4", 4, -1, 10));
|
||||||
selector.computeTask.run();
|
selector.computeTask.run();
|
||||||
nodeIds = selector.selectNodes();
|
nodeIds = selector.selectNodes();
|
||||||
System.out.println("3-> "+ nodeIds);
|
System.out.println("3-> "+ nodeIds);
|
||||||
|
@ -97,11 +101,11 @@ public class TestTopKNodeSelector {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueLengthSort() {
|
public void testQueueLengthSort() {
|
||||||
TopKNodeSelector selector = new TopKNodeSelector(5,
|
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
|
||||||
TopKNodeSelector.TopKComparator.QUEUE_LENGTH);
|
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
|
||||||
selector.nodeUpdate(createRMNode("h1", 1, -1, 15));
|
selector.updateNode(createRMNode("h1", 1, -1, 15));
|
||||||
selector.nodeUpdate(createRMNode("h2", 2, -1, 5));
|
selector.updateNode(createRMNode("h2", 2, -1, 5));
|
||||||
selector.nodeUpdate(createRMNode("h3", 3, -1, 10));
|
selector.updateNode(createRMNode("h3", 3, -1, 10));
|
||||||
selector.computeTask.run();
|
selector.computeTask.run();
|
||||||
List<NodeId> nodeIds = selector.selectNodes();
|
List<NodeId> nodeIds = selector.selectNodes();
|
||||||
System.out.println("1-> " + nodeIds);
|
System.out.println("1-> " + nodeIds);
|
||||||
|
@ -110,7 +114,7 @@ public class TestTopKNodeSelector {
|
||||||
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
|
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
|
||||||
|
|
||||||
// Now update node3
|
// Now update node3
|
||||||
selector.nodeUpdate(createRMNode("h3", 3, -1, 2));
|
selector.updateNode(createRMNode("h3", 3, -1, 2));
|
||||||
selector.computeTask.run();
|
selector.computeTask.run();
|
||||||
nodeIds = selector.selectNodes();
|
nodeIds = selector.selectNodes();
|
||||||
System.out.println("2-> "+ nodeIds);
|
System.out.println("2-> "+ nodeIds);
|
||||||
|
@ -119,7 +123,7 @@ public class TestTopKNodeSelector {
|
||||||
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
|
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
|
||||||
|
|
||||||
// Now send update with -1 wait time but valid length
|
// Now send update with -1 wait time but valid length
|
||||||
selector.nodeUpdate(createRMNode("h4", 4, -1, 20));
|
selector.updateNode(createRMNode("h4", 4, -1, 20));
|
||||||
selector.computeTask.run();
|
selector.computeTask.run();
|
||||||
nodeIds = selector.selectNodes();
|
nodeIds = selector.selectNodes();
|
||||||
System.out.println("3-> "+ nodeIds);
|
System.out.println("3-> "+ nodeIds);
|
||||||
|
@ -130,6 +134,50 @@ public class TestTopKNodeSelector {
|
||||||
Assert.assertEquals("h4:4", nodeIds.get(3).toString());
|
Assert.assertEquals("h4:4", nodeIds.get(3).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerQueuingLimit() {
|
||||||
|
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
|
||||||
|
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
|
||||||
|
selector.updateNode(createRMNode("h1", 1, -1, 15));
|
||||||
|
selector.updateNode(createRMNode("h2", 2, -1, 5));
|
||||||
|
selector.updateNode(createRMNode("h3", 3, -1, 10));
|
||||||
|
|
||||||
|
// Test Mean Calculation
|
||||||
|
selector.initThresholdCalculator(0, 6, 100);
|
||||||
|
QueueLimitCalculator calculator = selector.getThresholdCalculator();
|
||||||
|
ContainerQueuingLimit containerQueuingLimit = calculator
|
||||||
|
.createContainerQueuingLimit();
|
||||||
|
Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength());
|
||||||
|
Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs());
|
||||||
|
selector.computeTask.run();
|
||||||
|
containerQueuingLimit = calculator.createContainerQueuingLimit();
|
||||||
|
Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength());
|
||||||
|
Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs());
|
||||||
|
|
||||||
|
// Test Limits do not exceed specified max
|
||||||
|
selector.updateNode(createRMNode("h1", 1, -1, 110));
|
||||||
|
selector.updateNode(createRMNode("h2", 2, -1, 120));
|
||||||
|
selector.updateNode(createRMNode("h3", 3, -1, 130));
|
||||||
|
selector.updateNode(createRMNode("h4", 4, -1, 140));
|
||||||
|
selector.updateNode(createRMNode("h5", 5, -1, 150));
|
||||||
|
selector.updateNode(createRMNode("h6", 6, -1, 160));
|
||||||
|
selector.computeTask.run();
|
||||||
|
containerQueuingLimit = calculator.createContainerQueuingLimit();
|
||||||
|
Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength());
|
||||||
|
|
||||||
|
// Test Limits do not go below specified min
|
||||||
|
selector.updateNode(createRMNode("h1", 1, -1, 1));
|
||||||
|
selector.updateNode(createRMNode("h2", 2, -1, 2));
|
||||||
|
selector.updateNode(createRMNode("h3", 3, -1, 3));
|
||||||
|
selector.updateNode(createRMNode("h4", 4, -1, 4));
|
||||||
|
selector.updateNode(createRMNode("h5", 5, -1, 5));
|
||||||
|
selector.updateNode(createRMNode("h6", 6, -1, 6));
|
||||||
|
selector.computeTask.run();
|
||||||
|
containerQueuingLimit = calculator.createContainerQueuingLimit();
|
||||||
|
Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private RMNode createRMNode(String host, int port,
|
private RMNode createRMNode(String host, int port,
|
||||||
int waitTime, int queueLength) {
|
int waitTime, int queueLength) {
|
||||||
RMNode node1 = Mockito.mock(RMNode.class);
|
RMNode node1 = Mockito.mock(RMNode.class);
|
Loading…
Reference in New Issue