YARN-2995. Enhance UI to show cluster resource utilization of various container Execution types. (Konstantinos Karanasos via asuresh)

(cherry picked from commit 0aafc122d4)
(cherry picked from commit 25598b6e7118bacd33a167fb496e9cda5e9f9ad0)
This commit is contained in:
Arun Suresh 2016-11-04 07:31:54 -07:00
parent 28f3bb38bf
commit 7437252102
31 changed files with 600 additions and 235 deletions

View File

@ -233,6 +233,8 @@ public class RMContainerAllocator extends RMContainerRequestor
this.scheduledRequests.setNumOpportunisticMapsPer100(
conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100,
MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100));
LOG.info(this.scheduledRequests.getNumOpportunisticMapsPer100() +
"% of the mappers will be scheduled using OPPORTUNISTIC containers");
}
@Override
@ -1030,6 +1032,10 @@ public class RMContainerAllocator extends RMContainerRequestor
this.numOpportunisticMapsPer100 = numMaps;
}
int getNumOpportunisticMapsPer100() {
return this.numOpportunisticMapsPer100;
}
@VisibleForTesting
final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
new LinkedHashMap<TaskAttemptId, ContainerRequest>();

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -191,7 +191,7 @@ public class NodeInfo {
return null;
}
public QueuedContainersStatus getQueuedContainersStatus() {
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return null;
}

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -180,7 +180,7 @@ public class RMNodeWrapper implements RMNode {
return Collections.EMPTY_LIST;
}
public QueuedContainersStatus getQueuedContainersStatus() {
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return null;
}

View File

@ -125,10 +125,11 @@ public abstract class NodeStatus {
@Private
@Unstable
public abstract QueuedContainersStatus getQueuedContainersStatus();
public abstract OpportunisticContainersStatus
getOpportunisticContainersStatus();
@Private
@Unstable
public abstract void setQueuedContainersStatus(
QueuedContainersStatus queuedContainersStatus);
public abstract void setOpportunisticContainersStatus(
OpportunisticContainersStatus opportunisticContainersStatus);
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* <p> <code>OpportunisticContainersStatus</code> captures information
* pertaining to the state of execution of the opportunistic containers within a
* node. </p>
*/
@Private
@Unstable
public abstract class OpportunisticContainersStatus {
public static OpportunisticContainersStatus newInstance() {
return Records.newRecord(OpportunisticContainersStatus.class);
}
/**
* Returns the number of currently running opportunistic containers on the
* node.
*
* @return number of running opportunistic containers.
*/
@Private
@Unstable
public abstract int getRunningOpportContainers();
/**
* Sets the number of running opportunistic containers.
*
* @param runningOpportContainers number of running opportunistic containers.
*/
@Private
@Unstable
public abstract void setRunningOpportContainers(int runningOpportContainers);
/**
* Returns memory currently used on the node for running opportunistic
* containers.
*
* @return memory (in bytes) used for running opportunistic containers.
*/
@Private
@Unstable
public abstract long getOpportMemoryUsed();
/**
* Sets the memory used on the node for running opportunistic containers.
*
* @param opportMemoryUsed memory (in bytes) used for running opportunistic
* containers.
*/
@Private
@Unstable
public abstract void setOpportMemoryUsed(long opportMemoryUsed);
/**
* Returns CPU cores currently used on the node for running opportunistic
* containers.
*
* @return CPU cores used for running opportunistic containers.
*/
@Private
@Unstable
public abstract int getOpportCoresUsed();
/**
* Sets the CPU cores used on the node for running opportunistic containers.
*
* @param opportCoresUsed memory (in bytes) used for running opportunistic
* containers.
*/
@Private
@Unstable
public abstract void setOpportCoresUsed(int opportCoresUsed);
/**
* Returns the number of queued opportunistic containers on the node.
*
* @return number of queued opportunistic containers.
*/
@Private
@Unstable
public abstract int getQueuedOpportContainers();
/**
* Sets the number of queued opportunistic containers on the node.
*
* @param queuedOpportContainers number of queued opportunistic containers.
*/
@Private
@Unstable
public abstract void setQueuedOpportContainers(int queuedOpportContainers);
/**
* Returns the length of the containers queue on the node.
*
* @return length of the containers queue.
*/
@Private
@Unstable
public abstract int getWaitQueueLength();
/**
* Sets the length of the containers queue on the node.
*
* @param waitQueueLength length of the containers queue.
*/
@Private
@Unstable
public abstract void setWaitQueueLength(int waitQueueLength);
/**
* Returns the estimated time that a container will have to wait if added to
* the queue of the node.
*
* @return estimated queuing time.
*/
@Private
@Unstable
public abstract int getEstimatedQueueWaitTime();
/**
* Sets the estimated time that a container will have to wait if added to the
* queue of the node.
*
* @param queueWaitTime estimated queuing time.
*/
@Private
@Unstable
public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
}

View File

@ -1,45 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>
* <code>QueuedContainersStatus</code> captures information pertaining to the
* state of execution of the Queueable containers within a node.
* </p>
*/
@Private
@Evolving
public abstract class QueuedContainersStatus {
public static QueuedContainersStatus newInstance() {
return Records.newRecord(QueuedContainersStatus.class);
}
public abstract int getEstimatedQueueWaitTime();
public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
public abstract int getWaitQueueLength();
public abstract void setWaitQueueLength(int waitQueueLength);
}

View File

@ -41,9 +41,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.QueuedContainersStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.OpportunisticContainersStatusProto;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -404,25 +404,25 @@ public class NodeStatusPBImpl extends NodeStatus {
}
@Override
public synchronized QueuedContainersStatus getQueuedContainersStatus() {
NodeStatusProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
if (!p.hasQueuedContainerStatus()) {
public synchronized OpportunisticContainersStatus
getOpportunisticContainersStatus() {
NodeStatusProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
if (!p.hasOpportunisticContainersStatus()) {
return null;
}
return convertFromProtoFormat(p.getQueuedContainerStatus());
return convertFromProtoFormat(p.getOpportunisticContainersStatus());
}
@Override
public synchronized void setQueuedContainersStatus(
QueuedContainersStatus queuedContainersStatus) {
public synchronized void setOpportunisticContainersStatus(
OpportunisticContainersStatus opportunisticContainersStatus) {
maybeInitBuilder();
if (queuedContainersStatus == null) {
this.builder.clearQueuedContainerStatus();
if (opportunisticContainersStatus == null) {
this.builder.clearOpportunisticContainersStatus();
return;
}
this.builder.setQueuedContainerStatus(
convertToProtoFormat(queuedContainersStatus));
this.builder.setOpportunisticContainersStatus(
convertToProtoFormat(opportunisticContainersStatus));
}
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
@ -468,14 +468,14 @@ public class NodeStatusPBImpl extends NodeStatus {
return new ResourceUtilizationPBImpl(p);
}
private QueuedContainersStatusProto convertToProtoFormat(
QueuedContainersStatus r) {
return ((QueuedContainersStatusPBImpl) r).getProto();
private OpportunisticContainersStatusProto convertToProtoFormat(
OpportunisticContainersStatus r) {
return ((OpportunisticContainersStatusPBImpl) r).getProto();
}
private QueuedContainersStatus convertFromProtoFormat(
QueuedContainersStatusProto p) {
return new QueuedContainersStatusPBImpl(p);
private OpportunisticContainersStatus convertFromProtoFormat(
OpportunisticContainersStatusProto p) {
return new OpportunisticContainersStatusPBImpl(p);
}
private ContainerPBImpl convertFromProtoFormat(

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records.impl.pb;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
/**
* Protocol Buffer implementation of OpportunisticContainersStatus.
*/
public class OpportunisticContainersStatusPBImpl
extends OpportunisticContainersStatus {
private YarnServerCommonProtos.OpportunisticContainersStatusProto proto =
YarnServerCommonProtos.OpportunisticContainersStatusProto
.getDefaultInstance();
private YarnServerCommonProtos.OpportunisticContainersStatusProto.Builder
builder = null;
private boolean viaProto = false;
public OpportunisticContainersStatusPBImpl() {
builder =
YarnServerCommonProtos.OpportunisticContainersStatusProto.newBuilder();
}
public OpportunisticContainersStatusPBImpl(YarnServerCommonProtos
.OpportunisticContainersStatusProto proto) {
this.proto = proto;
viaProto = true;
}
public YarnServerCommonProtos.OpportunisticContainersStatusProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = YarnServerCommonProtos.OpportunisticContainersStatusProto
.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getRunningOpportContainers() {
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getRunningOpportContainers();
}
@Override
public void setRunningOpportContainers(int runningOpportContainers) {
maybeInitBuilder();
builder.setRunningOpportContainers(runningOpportContainers);
}
@Override
public long getOpportMemoryUsed() {
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getOpportMemoryUsed();
}
@Override
public void setOpportMemoryUsed(long opportMemoryUsed) {
maybeInitBuilder();
builder.setOpportMemoryUsed(opportMemoryUsed);
}
@Override
public int getOpportCoresUsed() {
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getOpportCoresUsed();
}
@Override
public void setOpportCoresUsed(int opportCoresUsed) {
maybeInitBuilder();
builder.setOpportCoresUsed(opportCoresUsed);
}
@Override
public int getQueuedOpportContainers() {
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getQueuedOpportContainers();
}
@Override
public void setQueuedOpportContainers(int queuedOpportContainers) {
maybeInitBuilder();
builder.setQueuedOpportContainers(queuedOpportContainers);
}
@Override
public int getWaitQueueLength() {
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getWaitQueueLength();
}
@Override
public void setWaitQueueLength(int waitQueueLength) {
maybeInitBuilder();
builder.setWaitQueueLength(waitQueueLength);
}
@Override
public int getEstimatedQueueWaitTime() {
YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getEstimatedQueueWaitTime();
}
@Override
public void setEstimatedQueueWaitTime(int queueWaitTime) {
maybeInitBuilder();
builder.setEstimatedQueueWaitTime(queueWaitTime);
}
}

View File

@ -1,84 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records.impl.pb;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
/**
* Protocol Buffer implementation of QueuedContainersStatus.
*/
public class QueuedContainersStatusPBImpl extends QueuedContainersStatus {
private YarnServerCommonProtos.QueuedContainersStatusProto proto =
YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance();
private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder =
null;
private boolean viaProto = false;
public QueuedContainersStatusPBImpl() {
builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder();
}
public QueuedContainersStatusPBImpl(YarnServerCommonProtos
.QueuedContainersStatusProto proto) {
this.proto = proto;
viaProto = true;
}
public YarnServerCommonProtos.QueuedContainersStatusProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder =
YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getEstimatedQueueWaitTime() {
YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getEstimatedQueueWaitTime();
}
@Override
public void setEstimatedQueueWaitTime(int queueWaitTime) {
maybeInitBuilder();
builder.setEstimatedQueueWaitTime(queueWaitTime);
}
@Override
public int getWaitQueueLength() {
YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
viaProto ? proto : builder;
return p.getWaitQueueLength();
}
@Override
public void setWaitQueueLength(int waitQueueLength) {
maybeInitBuilder();
builder.setWaitQueueLength(waitQueueLength);
}
}

View File

@ -39,12 +39,16 @@ message NodeStatusProto {
optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8;
optional QueuedContainersStatusProto queued_container_status = 9;
optional OpportunisticContainersStatusProto opportunistic_containers_status = 9;
}
message QueuedContainersStatusProto {
optional int32 estimated_queue_wait_time = 1;
optional int32 wait_queue_length = 2;
message OpportunisticContainersStatusProto {
optional int32 running_opport_containers = 1;
optional int64 opport_memory_used = 2;
optional int32 opport_cores_used = 3;
optional int32 queued_opport_containers = 4;
optional int32 wait_queue_length = 5;
optional int32 estimated_queue_wait_time = 6;
}
message MasterKeyProto {

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRe
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -146,11 +146,11 @@ public class TestProtocolRecords {
Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus nodeStatus =
Records.newRecord(NodeStatus.class);
QueuedContainersStatus queuedContainersStatus = Records.newRecord
(QueuedContainersStatus.class);
queuedContainersStatus.setEstimatedQueueWaitTime(123);
queuedContainersStatus.setWaitQueueLength(321);
nodeStatus.setQueuedContainersStatus(queuedContainersStatus);
OpportunisticContainersStatus opportunisticContainersStatus =
Records.newRecord(OpportunisticContainersStatus.class);
opportunisticContainersStatus.setEstimatedQueueWaitTime(123);
opportunisticContainersStatus.setWaitQueueLength(321);
nodeStatus.setOpportunisticContainersStatus(opportunisticContainersStatus);
record.setNodeStatus(nodeStatus);
NodeHeartbeatRequestPBImpl pb = new
@ -159,9 +159,10 @@ public class TestProtocolRecords {
Assert.assertEquals(123,
pb.getNodeStatus()
.getQueuedContainersStatus().getEstimatedQueueWaitTime());
.getOpportunisticContainersStatus().getEstimatedQueueWaitTime());
Assert.assertEquals(321,
pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
pb.getNodeStatus().getOpportunisticContainersStatus()
.getWaitQueueLength());
}
@Test

View File

@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@ -478,16 +478,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
createKeepAliveApplicationList(), nodeHealthStatus,
containersUtilization, nodeUtilization, increasedContainers);
nodeStatus.setQueuedContainersStatus(getQueuedContainerStatus());
nodeStatus.setOpportunisticContainersStatus(
getOpportunisticContainersStatus());
return nodeStatus;
}
private QueuedContainersStatus getQueuedContainerStatus() {
QueuedContainersStatus status = QueuedContainersStatus.newInstance();
status.setWaitQueueLength(
this.context.getQueuingContext().getQueuedContainers().size());
/**
* Get the status of the OPPORTUNISTIC containers.
* @return the status of the OPPORTUNISTIC containers.
*/
private OpportunisticContainersStatus getOpportunisticContainersStatus() {
OpportunisticContainersStatus status =
this.context.getContainerManager().getOpportunisticContainersStatus();
return status;
}
/**
* Get the aggregated utilization of the containers in this node.
* @return Resource utilization of all the containers.

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ContainersMonitor;
@ -35,6 +36,8 @@ public interface ContainerManager extends ServiceStateChangeListener,
ContainersMonitor getContainersMonitor();
OpportunisticContainersStatus getOpportunisticContainersStatus();
void updateQueuingLimit(ContainerQueuingLimit queuingLimit);
void setBlockNewContainerRequests(boolean blockNewContainerRequests);

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
@ -1443,6 +1444,11 @@ public class ContainerManagerImpl extends CompositeService implements
return serviceStopped;
}
@Override
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return null;
}
@Override
public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) {
LOG.trace("Implementation does not support queuing of Containers!!");

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@ -77,10 +78,14 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
private ConcurrentMap<ContainerId, AllocatedContainerInfo>
allocatedOpportunisticContainers;
private long allocatedMemoryOpportunistic;
private int allocatedVCoresOpportunistic;
private Queue<AllocatedContainerInfo> queuedGuaranteedContainers;
private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
private Set<ContainerId> opportunisticContainersToKill;
private final OpportunisticContainersStatus opportunisticContainersStatus;
private final ContainerQueuingLimit queuingLimit;
public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
@ -90,10 +95,14 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
dirsHandler);
this.allocatedGuaranteedContainers = new ConcurrentHashMap<>();
this.allocatedOpportunisticContainers = new ConcurrentHashMap<>();
this.allocatedMemoryOpportunistic = 0;
this.allocatedVCoresOpportunistic = 0;
this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>();
this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
this.opportunisticContainersToKill = Collections.synchronizedSet(
new HashSet<ContainerId>());
this.opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
this.queuingLimit = ContainerQueuingLimit.newInstance();
}
@ -189,6 +198,8 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
} else {
allocatedOpportunisticContainers.put(pti.getContainerId(),
allocatedContainerInfo);
allocatedMemoryOpportunistic += pti.getPmemLimit();
allocatedVCoresOpportunistic += pti.getCpuVcores();
}
getContainersMonitor().increaseContainersAllocation(pti);
@ -260,6 +271,11 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
if (contToRemove != null) {
getContainersMonitor().decreaseContainersAllocation(contToRemove
.getPti());
if (contToRemove.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
allocatedMemoryOpportunistic -= contToRemove.getPti().getPmemLimit();
allocatedVCoresOpportunistic -= contToRemove.getPti().getCpuVcores();
}
}
}
@ -517,6 +533,22 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
}
}
@Override
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
opportunisticContainersStatus
.setRunningOpportContainers(allocatedOpportunisticContainers.size());
opportunisticContainersStatus
.setOpportMemoryUsed(allocatedMemoryOpportunistic);
opportunisticContainersStatus
.setOpportCoresUsed(allocatedVCoresOpportunistic);
opportunisticContainersStatus
.setQueuedOpportContainers(queuedOpportunisticContainers.size());
opportunisticContainersStatus.setWaitQueueLength(
queuedGuaranteedContainers.size() +
queuedOpportunisticContainers.size());
return opportunisticContainersStatus;
}
@Override
public void updateQueuingLimit(ContainerQueuingLimit limit) {
this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());

View File

@ -86,6 +86,7 @@ public class ContainerPage extends NMView implements YarnWebParams {
._("User", info.getUser())
._("TotalMemoryNeeded", info.getMemoryNeeded())
._("TotalVCoresNeeded", info.getVCoresNeeded())
._("ExecutionType", info.getExecutionType())
._("logs", info.getShortLogLink(), "Link to logs");
html._(InfoBlock.class);
}

View File

@ -46,6 +46,7 @@ public class ContainerInfo {
protected String user;
protected long totalMemoryNeededMB;
protected long totalVCoresNeeded;
private String executionType;
protected String containerLogsLink;
protected String nodeId;
@XmlTransient
@ -84,6 +85,8 @@ public class ContainerInfo {
this.totalMemoryNeededMB = res.getMemorySize();
this.totalVCoresNeeded = res.getVirtualCores();
}
this.executionType =
container.getContainerTokenIdentifier().getExecutionType().name();
this.containerLogsShortLink = ujoin("containerlogs", this.id,
container.getUser());
@ -143,6 +146,10 @@ public class ContainerInfo {
return this.totalVCoresNeeded;
}
public String getExecutionType() {
return this.executionType;
}
public List<String> getContainerLogFiles() {
return this.containerLogFiles;
}

View File

@ -507,7 +507,7 @@ public class TestNMWebServicesContainers extends JerseyTestBase {
public void verifyNodeContainerInfo(JSONObject info, Container cont)
throws JSONException, Exception {
assertEquals("incorrect number of elements", 10, info.length());
assertEquals("incorrect number of elements", 11, info.length());
verifyNodeContainerInfoGeneric(cont, info.getString("id"),
info.getString("state"), info.getString("user"),

View File

@ -81,8 +81,9 @@ import java.util.List;
/**
* The OpportunisticContainerAllocatorAMService is started instead of the
* ApplicationMasterService if distributed scheduling is enabled for the YARN
* cluster.
* ApplicationMasterService if opportunistic scheduling is enabled for the YARN
* cluster (either centralized or distributed opportunistic scheduling).
*
* It extends the functionality of the ApplicationMasterService by servicing
* clients (AMs and AMRMProxy request interceptors) that understand the
* DistributedSchedulingProtocol.

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
/**
* Node managers information on available resources
@ -176,7 +176,7 @@ public interface RMNode {
public List<Container> pullNewlyIncreasedContainers();
QueuedContainersStatus getQueuedContainersStatus();
OpportunisticContainersStatus getOpportunisticContainersStatus();
long getUntrackedTimeStamp();

View File

@ -61,7 +61,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
@ -137,7 +137,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private volatile Resource physicalResource;
/* Container Queue Information for the node.. Used by Distributed Scheduler */
private QueuedContainersStatus queuedContainersStatus;
private OpportunisticContainersStatus opportunisticContainersStatus;
private final ContainerAllocationExpirer containerAllocationExpirer;
/* set of containers that have just launched */
@ -1190,7 +1190,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo());
rmNode.setOpportunisticContainersStatus(
statusEvent.getOpportunisticContainersStatus());
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
rmNode, statusEvent);
NodeState initialState = rmNode.getState();
@ -1511,23 +1512,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.timeStamp = ts;
}
@Override
public QueuedContainersStatus getQueuedContainersStatus() {
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
this.readLock.lock();
try {
return this.queuedContainersStatus;
return this.opportunisticContainersStatus;
} finally {
this.readLock.unlock();
}
}
public void setQueuedContainersStatus(QueuedContainersStatus
queuedContainersStatus) {
public void setOpportunisticContainersStatus(
OpportunisticContainersStatus opportunisticContainersStatus) {
this.writeLock.lock();
try {
this.queuedContainersStatus = queuedContainersStatus;
this.opportunisticContainersStatus = opportunisticContainersStatus;
} finally {
this.writeLock.unlock();
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -80,8 +80,8 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.logAggregationReportsForApps;
}
public QueuedContainersStatus getContainerQueueInfo() {
return this.nodeStatus.getQueuedContainersStatus();
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return this.nodeStatus.getOpportunisticContainersStatus();
}
public void setLogAggregationReportsForApps(

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
@ -580,7 +581,7 @@ public abstract class AbstractYarnScheduler
return;
}
if (!rmContainer.isRemotelyAllocated()) {
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
completedContainerInternal(rmContainer, containerStatus, event);
} else {
ContainerId containerId = rmContainer.getContainerId();

View File

@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -195,11 +195,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
@Override
public void updateNode(RMNode rmNode) {
LOG.debug("Node update event from: " + rmNode.getNodeID());
QueuedContainersStatus queuedContainersStatus =
rmNode.getQueuedContainersStatus();
OpportunisticContainersStatus opportunisticContainersStatus =
rmNode.getOpportunisticContainersStatus();
int estimatedQueueWaitTime =
queuedContainersStatus.getEstimatedQueueWaitTime();
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
opportunisticContainersStatus.getEstimatedQueueWaitTime();
int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();
// Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
// UNLESS comparator is based on queue length.
ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();

View File

@ -18,17 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NODE_LABEL;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NODE_STATE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
import java.util.Collection;
import com.google.inject.Inject;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@ -42,18 +35,29 @@ import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
import java.util.Collection;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NODE_LABEL;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.NODE_STATE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
class NodesPage extends RmView {
static class NodesBlock extends HtmlBlock {
final ResourceManager rm;
private static final long BYTES_IN_MB = 1024 * 1024;
private static boolean opportunisticContainersEnabled;
@Inject
NodesBlock(ResourceManager rm, ViewContext ctx) {
super(ctx);
this.rm = rm;
this.opportunisticContainersEnabled = YarnConfiguration
.isOpportunisticContainerAllocationEnabled(
this.rm.getRMContext().getYarnConfiguration());
}
@Override
@ -61,9 +65,10 @@ class NodesPage extends RmView {
html._(MetricsOverviewTable.class);
ResourceScheduler sched = rm.getResourceScheduler();
String type = $(NODE_STATE);
String labelFilter = $(NODE_LABEL, CommonNodeLabelsManager.ANY).trim();
TBODY<TABLE<Hamlet>> tbody =
Hamlet.TR<Hamlet.THEAD<TABLE<Hamlet>>> trbody =
html.table("#nodes").thead().tr()
.th(".nodelabels", "Node Labels")
.th(".rack", "Rack")
@ -71,13 +76,29 @@ class NodesPage extends RmView {
.th(".nodeaddress", "Node Address")
.th(".nodehttpaddress", "Node HTTP Address")
.th(".lastHealthUpdate", "Last health-update")
.th(".healthReport", "Health-report")
.th(".containers", "Containers")
.th(".healthReport", "Health-report");
if (!this.opportunisticContainersEnabled) {
trbody.th(".containers", "Containers")
.th(".mem", "Mem Used")
.th(".mem", "Mem Avail")
.th(".vcores", "VCores Used")
.th(".vcores", "VCores Avail")
.th(".nodeManagerVersion", "Version")._()._().tbody();
.th(".vcores", "VCores Avail");
} else {
trbody.th(".containers", "Running Containers (G)")
.th(".mem", "Mem Used (G)")
.th(".mem", "Mem Avail (G)")
.th(".vcores", "VCores Used (G)")
.th(".vcores", "VCores Avail (G)")
.th(".containers", "Running Containers (O)")
.th(".mem", "Mem Used (O)")
.th(".vcores", "VCores Used (O)")
.th(".containers", "Queued Containers");
}
TBODY<TABLE<Hamlet>> tbody =
trbody.th(".nodeManagerVersion", "Version")._()._().tbody();
NodeState stateFilter = null;
if (type != null && !type.isEmpty()) {
stateFilter = NodeState.valueOf(StringUtils.toUpperCase(type));
@ -153,7 +174,23 @@ class NodesPage extends RmView {
.append("\",\"").append(String.valueOf(info.getUsedVirtualCores()))
.append("\",\"")
.append(String.valueOf(info.getAvailableVirtualCores()))
.append("\",\"").append(ni.getNodeManagerVersion())
.append("\",\"");
// If opportunistic containers are enabled, add extra fields.
if (this.opportunisticContainersEnabled) {
nodeTableData
.append(String.valueOf(info.getNumRunningOpportContainers()))
.append("\",\"").append("<br title='")
.append(String.valueOf(info.getUsedMemoryOpport())).append("'>")
.append(StringUtils.byteDesc(info.getUsedMemoryOpport()))
.append("\",\"")
.append(String.valueOf(info.getUsedVirtualCoresOpport()))
.append("\",\"")
.append(String.valueOf(info.getNumQueuedContainers()))
.append("\",\"");
}
nodeTableData.append(ni.getNodeManagerVersion())
.append("\"],\n");
}
if (nodeTableData.charAt(nodeTableData.length() - 2) == ',') {

View File

@ -28,6 +28,7 @@ import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@ -49,6 +50,10 @@ public class NodeInfo {
protected long availMemoryMB;
protected long usedVirtualCores;
protected long availableVirtualCores;
private int numRunningOpportContainers;
private long usedMemoryOpport; // Memory in bytes.
private long usedVirtualCoresOpport;
private int numQueuedContainers;
protected ArrayList<String> nodeLabels = new ArrayList<String>();
protected ResourceUtilizationInfo resourceUtilization;
@ -66,7 +71,8 @@ public class NodeInfo {
this.usedMemoryMB = report.getUsedResource().getMemorySize();
this.availMemoryMB = report.getAvailableResource().getMemorySize();
this.usedVirtualCores = report.getUsedResource().getVirtualCores();
this.availableVirtualCores = report.getAvailableResource().getVirtualCores();
this.availableVirtualCores =
report.getAvailableResource().getVirtualCores();
}
this.id = id.toString();
this.rack = ni.getRackName();
@ -77,6 +83,21 @@ public class NodeInfo {
this.healthReport = String.valueOf(ni.getHealthReport());
this.version = ni.getNodeManagerVersion();
// Status of opportunistic containers.
this.numRunningOpportContainers = 0;
this.usedMemoryOpport = 0;
this.usedVirtualCoresOpport = 0;
this.numQueuedContainers = 0;
OpportunisticContainersStatus opportStatus =
ni.getOpportunisticContainersStatus();
if (opportStatus != null) {
this.numRunningOpportContainers =
opportStatus.getRunningOpportContainers();
this.usedMemoryOpport = opportStatus.getOpportMemoryUsed();
this.usedVirtualCoresOpport = opportStatus.getOpportCoresUsed();
this.numQueuedContainers = opportStatus.getQueuedOpportContainers();
}
// add labels
Set<String> labelSet = ni.getNodeLabels();
if (labelSet != null) {
@ -140,6 +161,22 @@ public class NodeInfo {
return this.availableVirtualCores;
}
public int getNumRunningOpportContainers() {
return numRunningOpportContainers;
}
public long getUsedMemoryOpport() {
return usedMemoryOpport;
}
public long getUsedVirtualCoresOpport() {
return usedVirtualCoresOpport;
}
public int getNumQueuedContainers() {
return numQueuedContainers;
}
public ArrayList<String> getNodeLabels() {
return this.nodeLabels;
}

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -262,6 +262,11 @@ public class MockNodes {
return this.nodeUtilization;
}
@Override
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return null;
}
@Override
public long getUntrackedTimeStamp() {
return 0;
@ -272,11 +277,6 @@ public class MockNodes {
}
@Override
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
@Override
public Integer getDecommissioningTimeout() {
return null;

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.junit.Assert;
import org.junit.Test;
@ -183,13 +183,13 @@ public class TestNodeQueueLoadMonitor {
RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port);
Mockito.when(node1.getNodeID()).thenReturn(nID1);
QueuedContainersStatus status1 =
Mockito.mock(QueuedContainersStatus.class);
OpportunisticContainersStatus status1 =
Mockito.mock(OpportunisticContainersStatus.class);
Mockito.when(status1.getEstimatedQueueWaitTime())
.thenReturn(waitTime);
Mockito.when(status1.getWaitQueueLength())
.thenReturn(queueLength);
Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
return node1;
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodesPage.NodesBlock;
@ -49,6 +50,7 @@ public class TestNodesPage {
// future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 23;
final int numberOfActualTableHeaders = 13;
private final int numberOfThForOpportunisticContainers = 4;
private Injector injector;
@ -135,4 +137,35 @@ public class TestNodesPage {
Mockito.verify(writer, Mockito.times(numberOfThInMetricsTable))
.print("<td");
}
@Test
public void testNodesBlockRenderForOpportunisticContainers() {
final RMContext mockRMContext =
TestRMWebApp.mockRMContext(3, numberOfRacks, numberOfNodesPerRack,
8 * TestRMWebApp.GiB);
mockRMContext.getYarnConfiguration().setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
injector =
WebAppTests.createMockInjector(RMContext.class, mockRMContext,
new Module() {
@Override
public void configure(Binder binder) {
try {
binder.bind(ResourceManager.class).toInstance(
TestRMWebApp.mockRm(mockRMContext));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
});
injector.getInstance(NodesBlock.class).render();
PrintWriter writer = injector.getInstance(PrintWriter.class);
WebAppTests.flushOutput(injector);
Mockito.verify(writer, Mockito.times(
numberOfActualTableHeaders + numberOfThInMetricsTable +
numberOfThForOpportunisticContainers)).print("<th");
Mockito.verify(writer, Mockito.times(numberOfThInMetricsTable))
.print("<td");
}
}

View File

@ -200,6 +200,7 @@ public class TestRMWebApp {
}
};
rmContext.setNodeLabelManager(new NullRMNodeLabelsManager());
rmContext.setYarnConfiguration(new YarnConfiguration());
return rmContext;
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -702,13 +703,17 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
"aggregatedContainersPhysicalMemoryMB"),
WebServicesTestUtils.getXmlInt(element,
"aggregatedContainersVirtualMemoryMB"),
WebServicesTestUtils.getXmlFloat(element, "containersCPUUsage"));
WebServicesTestUtils.getXmlFloat(element, "containersCPUUsage"),
WebServicesTestUtils.getXmlInt(element, "numRunningOpportContainers"),
WebServicesTestUtils.getXmlLong(element, "usedMemoryOpport"),
WebServicesTestUtils.getXmlInt(element, "usedVirtualCoresOpport"),
WebServicesTestUtils.getXmlInt(element, "numQueuedContainers"));
}
}
public void verifyNodeInfo(JSONObject nodeInfo, RMNode nm)
throws JSONException, Exception {
assertEquals("incorrect number of elements", 14, nodeInfo.length());
assertEquals("incorrect number of elements", 18, nodeInfo.length());
JSONObject resourceInfo = nodeInfo.getJSONObject("resourceUtilization");
verifyNodeInfoGeneric(nm, nodeInfo.getString("state"),
@ -725,21 +730,29 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
resourceInfo.getDouble("nodeCPUUsage"),
resourceInfo.getInt("aggregatedContainersPhysicalMemoryMB"),
resourceInfo.getInt("aggregatedContainersVirtualMemoryMB"),
resourceInfo.getDouble("containersCPUUsage"));
resourceInfo.getDouble("containersCPUUsage"),
nodeInfo.getInt("numRunningOpportContainers"),
nodeInfo.getLong("usedMemoryOpport"),
nodeInfo.getInt("usedVirtualCoresOpport"),
nodeInfo.getInt("numQueuedContainers"));
}
public void verifyNodeInfoGeneric(RMNode node, String state, String rack,
String id, String nodeHostName,
String nodeHTTPAddress, long lastHealthUpdate, String healthReport,
int numContainers, long usedMemoryMB, long availMemoryMB, long usedVirtualCores,
long availVirtualCores, String version, int nodePhysicalMemoryMB,
int nodeVirtualMemoryMB, double nodeCPUUsage,
int numContainers, long usedMemoryMB, long availMemoryMB,
long usedVirtualCores, long availVirtualCores, String version,
int nodePhysicalMemoryMB, int nodeVirtualMemoryMB, double nodeCPUUsage,
int containersPhysicalMemoryMB, int containersVirtualMemoryMB,
double containersCPUUsage)
double containersCPUUsage, int numRunningOpportContainers,
long usedMemoryOpport, int usedVirtualCoresOpport,
int numQueuedContainers)
throws JSONException, Exception {
ResourceScheduler sched = rm.getResourceScheduler();
SchedulerNodeReport report = sched.getNodeReport(node.getNodeID());
OpportunisticContainersStatus opportunisticStatus =
node.getOpportunisticContainersStatus();
WebServicesTestUtils.checkStringMatch("state", node.getState().toString(),
state);
@ -787,6 +800,20 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
assertEquals("availVirtualCores doesn't match: " + availVirtualCores, report
.getAvailableResource().getVirtualCores(), availVirtualCores);
}
if (opportunisticStatus != null) {
assertEquals("numRunningOpportContainers doesn't match: " +
numRunningOpportContainers,
opportunisticStatus.getRunningOpportContainers(),
numRunningOpportContainers);
assertEquals("usedMemoryOpport doesn't match: " + usedMemoryOpport,
opportunisticStatus.getOpportMemoryUsed(), usedMemoryOpport);
assertEquals(
"usedVirtualCoresOpport doesn't match: " + usedVirtualCoresOpport,
opportunisticStatus.getOpportCoresUsed(), usedVirtualCoresOpport);
assertEquals("numQueuedContainers doesn't match: " + numQueuedContainers,
opportunisticStatus.getQueuedOpportContainers(), numQueuedContainers);
}
}
}