YARN-3348. Add a 'yarn top' tool to help understand cluster usage. Contributed by Varun Vasudev

This commit is contained in:
Jian He 2015-04-10 10:40:31 -07:00
parent 3de0bf9a35
commit d3daf9665c
19 changed files with 1995 additions and 12 deletions

View File

@ -336,6 +336,15 @@ public class ResourceMgrDelegate extends YarnClient {
return client.getApplications(applicationTypes, applicationStates);
}
@Override
public List<ApplicationReport> getApplications(Set<String> queues,
Set<String> users, Set<String> applicationTypes,
EnumSet<YarnApplicationState> applicationStates) throws YarnException,
IOException {
return client.getApplications(queues, users, applicationTypes,
applicationStates);
}
@Override
public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
IOException {

View File

@ -15,6 +15,9 @@ Release 2.8.0 - UNRELEASED
YARN-1376. NM need to notify the log aggregation status to RM through
heartbeat. (Xuan Gong via junping_du)
YARN-3348. Add a 'yarn top' tool to help understand cluster usage. (Varun
Vasudev via jianhe)
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -92,6 +92,7 @@ function print_usage(){
echo " cluster prints cluster information"
echo " daemonlog get/set the log level for each"
echo " daemon"
echo " top run cluster usage tool"
echo ""
echo "Most commands print help when invoked w/o parameters."
}
@ -295,6 +296,35 @@ elif [ "$COMMAND" = "daemonlog" ] ; then
elif [ "$COMMAND" = "cluster" ] ; then
CLASS=org.apache.hadoop.yarn.client.cli.ClusterCLI
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "top" ]; then
CLASS=org.apache.hadoop.yarn.client.cli.TopCLI
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
doNotSetCols=0
doNotSetRows=0
for i in "$@"; do
if [[ $i == "-cols" ]]; then
doNotSetCols=1
fi
if [[ $i == "-rows" ]]; then
doNotSetRows=1
fi
done
if [ $doNotSetCols == 0 ] && [ -n "${TERM}" ]; then
cols=$(tput cols)
if [ -n "$cols" ]; then
args=( $@ )
args=("${args[@]}" "-cols" "$cols")
set -- "${args[@]}"
fi
fi
if [ $doNotSetRows == 0 ] && [ -n "${TERM}" ]; then
rows=$(tput lines)
if [ -n "$rows" ]; then
args=( $@ )
args=("${args[@]}" "-rows" "$rows")
set -- "${args[@]}"
fi
fi
else
CLASS=$COMMAND
fi

View File

@ -55,7 +55,7 @@ public abstract class QueueInfo {
float maximumCapacity, float currentCapacity,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression) {
String defaultNodeLabelExpression, QueueStatistics queueStatistics) {
QueueInfo queueInfo = Records.newRecord(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setCapacity(capacity);
@ -66,6 +66,7 @@ public abstract class QueueInfo {
queueInfo.setQueueState(queueState);
queueInfo.setAccessibleNodeLabels(accessibleNodeLabels);
queueInfo.setDefaultNodeLabelExpression(defaultNodeLabelExpression);
queueInfo.setQueueStatistics(queueStatistics);
return queueInfo;
}
@ -184,4 +185,24 @@ public abstract class QueueInfo {
@Stable
public abstract void setDefaultNodeLabelExpression(
String defaultLabelExpression);
/**
* Get the <code>queue stats</code> for the queue
*
* @return <code>queue stats</code> of the queue
*/
@Public
@Unstable
public abstract QueueStatistics getQueueStatistics();
/**
* Set the queue statistics for the queue
*
* @param queueStatistics
* the queue statistics
*/
@Public
@Unstable
public abstract void setQueueStatistics(QueueStatistics queueStatistics);
}

View File

@ -0,0 +1,279 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.util.Records;
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class QueueStatistics {
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static QueueStatistics newInstance(long submitted, long running,
long pending, long completed, long killed, long failed, long activeUsers,
long availableMemoryMB, long allocatedMemoryMB, long pendingMemoryMB,
long reservedMemoryMB, long availableVCores, long allocatedVCores,
long pendingVCores, long reservedVCores) {
QueueStatistics statistics = Records.newRecord(QueueStatistics.class);
statistics.setNumAppsSubmitted(submitted);
statistics.setNumAppsRunning(running);
statistics.setNumAppsPending(pending);
statistics.setNumAppsCompleted(completed);
statistics.setNumAppsKilled(killed);
statistics.setNumAppsFailed(failed);
statistics.setNumActiveUsers(activeUsers);
statistics.setAvailableMemoryMB(availableMemoryMB);
statistics.setAllocatedMemoryMB(allocatedMemoryMB);
statistics.setPendingMemoryMB(pendingMemoryMB);
statistics.setReservedMemoryMB(reservedMemoryMB);
statistics.setAvailableVCores(availableVCores);
statistics.setAllocatedVCores(allocatedVCores);
statistics.setPendingVCores(pendingVCores);
statistics.setReservedVCores(reservedVCores);
return statistics;
}
/**
* Get the number of apps submitted
*
* @return the number of apps submitted
*/
public abstract long getNumAppsSubmitted();
/**
* Set the number of apps submitted
*
* @param numAppsSubmitted
* the number of apps submitted
*/
public abstract void setNumAppsSubmitted(long numAppsSubmitted);
/**
* Get the number of running apps
*
* @return the number of running apps
*/
public abstract long getNumAppsRunning();
/**
* Set the number of running apps
*
* @param numAppsRunning
* the number of running apps
*/
public abstract void setNumAppsRunning(long numAppsRunning);
/**
* Get the number of pending apps
*
* @return the number of pending apps
*/
public abstract long getNumAppsPending();
/**
* Set the number of pending apps
*
* @param numAppsPending
* the number of pending apps
*/
public abstract void setNumAppsPending(long numAppsPending);
/**
* Get the number of completed apps
*
* @return the number of completed apps
*/
public abstract long getNumAppsCompleted();
/**
* Set the number of completed apps
*
* @param numAppsCompleted
* the number of completed apps
*/
public abstract void setNumAppsCompleted(long numAppsCompleted);
/**
* Get the number of killed apps
*
* @return the number of killed apps
*/
public abstract long getNumAppsKilled();
/**
* Set the number of killed apps
*
* @param numAppsKilled
* the number of killed apps
*/
public abstract void setNumAppsKilled(long numAppsKilled);
/**
* Get the number of failed apps
*
* @return the number of failed apps
*/
public abstract long getNumAppsFailed();
/**
* Set the number of failed apps
*
* @param numAppsFailed
* the number of failed apps
*/
public abstract void setNumAppsFailed(long numAppsFailed);
/**
* Get the number of active users
*
* @return the number of active users
*/
public abstract long getNumActiveUsers();
/**
* Set the number of active users
*
* @param numActiveUsers
* the number of active users
*/
public abstract void setNumActiveUsers(long numActiveUsers);
/**
* Get the available memory in MB
*
* @return the available memory
*/
public abstract long getAvailableMemoryMB();
/**
* Set the available memory in MB
*
* @param availableMemoryMB
* the available memory
*/
public abstract void setAvailableMemoryMB(long availableMemoryMB);
/**
* Get the allocated memory in MB
*
* @return the allocated memory
*/
public abstract long getAllocatedMemoryMB();
/**
* Set the allocated memory in MB
*
* @param allocatedMemoryMB
* the allocate memory
*/
public abstract void setAllocatedMemoryMB(long allocatedMemoryMB);
/**
* Get the pending memory in MB
*
* @return the pending memory
*/
public abstract long getPendingMemoryMB();
/**
* Set the pending memory in MB
*
* @param pendingMemoryMB
* the pending memory
*/
public abstract void setPendingMemoryMB(long pendingMemoryMB);
/**
* Get the reserved memory in MB
*
* @return the reserved memory
*/
public abstract long getReservedMemoryMB();
/**
* Set the reserved memory in MB
*
* @param reservedMemoryMB
* the reserved memory
*/
public abstract void setReservedMemoryMB(long reservedMemoryMB);
/**
* Get the available vcores
*
* @return the available vcores
*/
public abstract long getAvailableVCores();
/**
* Set the available vcores
*
* @param availableVCores
* the available vcores
*/
public abstract void setAvailableVCores(long availableVCores);
/**
* Get the allocated vcores
*
* @return the allocated vcores
*/
public abstract long getAllocatedVCores();
/**
* Set the allocated vcores
*
* @param allocatedVCores
* the allocated vcores
*/
public abstract void setAllocatedVCores(long allocatedVCores);
/**
* Get the pending vcores
*
* @return the pending vcores
*/
public abstract long getPendingVCores();
/**
* Set the pending vcores
*
* @param pendingVCores
* the pending vcores
*/
public abstract void setPendingVCores(long pendingVCores);
/**
* Get the reserved vcores
*
* @return the reserved vcores
*/
public abstract long getReservedVCores();
/**
* Set the reserved vcores
*
* @param reservedVCores
* the reserved vcores
*/
public abstract void setReservedVCores(long reservedVCores);
}

View File

@ -53,4 +53,70 @@ public abstract class YarnClusterMetrics {
@Unstable
public abstract void setNumNodeManagers(int numNodeManagers);
/**
* Get the number of <code>DecommissionedNodeManager</code>s in the cluster.
*
* @return number of <code>DecommissionedNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumDecommissionedNodeManagers();
@Private
@Unstable
public abstract void setNumDecommissionedNodeManagers(
int numDecommissionedNodeManagers);
/**
* Get the number of <code>ActiveNodeManager</code>s in the cluster.
*
* @return number of <code>ActiveNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumActiveNodeManagers();
@Private
@Unstable
public abstract void setNumActiveNodeManagers(int numActiveNodeManagers);
/**
* Get the number of <code>LostNodeManager</code>s in the cluster.
*
* @return number of <code>LostNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumLostNodeManagers();
@Private
@Unstable
public abstract void setNumLostNodeManagers(int numLostNodeManagers);
/**
* Get the number of <code>UnhealthyNodeManager</code>s in the cluster.
*
* @return number of <code>UnhealthyNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumUnhealthyNodeManagers();
@Private
@Unstable
public abstract void setNumUnhealthyNodeManagers(int numUnhealthNodeManagers);
/**
* Get the number of <code>RebootedNodeManager</code>s in the cluster.
*
* @return number of <code>RebootedNodeManager</code>s in the cluster
*/
@Public
@Unstable
public abstract int getNumRebootedNodeManagers();
@Private
@Unstable
public abstract void setNumRebootedNodeManagers(int numRebootedNodeManagers);
}

View File

@ -339,6 +339,11 @@ message ApplicationACLMapProto {
message YarnClusterMetricsProto {
optional int32 num_node_managers = 1;
optional int32 num_decommissioned_nms = 2;
optional int32 num_active_nms = 3;
optional int32 num_lost_nms = 4;
optional int32 num_unhealthy_nms = 5;
optional int32 num_rebooted_nms = 6;
}
enum QueueStateProto {
@ -346,6 +351,24 @@ enum QueueStateProto {
Q_RUNNING = 2;
}
message QueueStatisticsProto {
optional int64 numAppsSubmitted = 1;
optional int64 numAppsRunning = 2;
optional int64 numAppsPending = 3;
optional int64 numAppsCompleted = 4;
optional int64 numAppsKilled = 5;
optional int64 numAppsFailed = 6;
optional int64 numActiveUsers = 7;
optional int64 availableMemoryMB = 8;
optional int64 allocatedMemoryMB = 9;
optional int64 pendingMemoryMB = 10;
optional int64 reservedMemoryMB = 11;
optional int64 availableVCores = 12;
optional int64 allocatedVCores = 13;
optional int64 pendingVCores = 14;
optional int64 reservedVCores = 15;
}
message QueueInfoProto {
optional string queueName = 1;
optional float capacity = 2;
@ -356,6 +379,7 @@ message QueueInfoProto {
repeated ApplicationReportProto applications = 7;
repeated string accessibleNodeLabels = 8;
optional string defaultNodeLabelExpression = 9;
optional QueueStatisticsProto queueStatistics = 10;
}
enum QueueACLProto {

View File

@ -237,7 +237,7 @@ public abstract class YarnClient extends AbstractService {
* {@link #getApplicationReport(ApplicationId)}.
* </p>
*
* @param applicationTypes
* @param applicationTypes set of application types you are interested in
* @return a list of reports of applications
* @throws YarnException
* @throws IOException
@ -257,7 +257,7 @@ public abstract class YarnClient extends AbstractService {
* {@link #getApplicationReport(ApplicationId)}.
* </p>
*
* @param applicationStates
* @param applicationStates set of application states you are interested in
* @return a list of reports of applications
* @throws YarnException
* @throws IOException
@ -278,8 +278,8 @@ public abstract class YarnClient extends AbstractService {
* {@link #getApplicationReport(ApplicationId)}.
* </p>
*
* @param applicationTypes
* @param applicationStates
* @param applicationTypes set of application types you are interested in
* @param applicationStates set of application states you are interested in
* @return a list of reports of applications
* @throws YarnException
* @throws IOException
@ -289,6 +289,32 @@ public abstract class YarnClient extends AbstractService {
EnumSet<YarnApplicationState> applicationStates) throws YarnException,
IOException;
/**
* <p>
* Get a report (ApplicationReport) of Applications matching the given users,
* queues, application types and application states in the cluster. If any of
* the params is set to null, it is not used when filtering.
* </p>
*
* <p>
* If the user does not have <code>VIEW_APP</code> access for an application
* then the corresponding report will be filtered as described in
* {@link #getApplicationReport(ApplicationId)}.
* </p>
*
* @param queues set of queues you are interested in
* @param users set of users you are interested in
* @param applicationTypes set of application types you are interested in
* @param applicationStates set of application states you are interested in
* @return a list of reports of applications
* @throws YarnException
* @throws IOException
*/
public abstract List<ApplicationReport> getApplications(Set<String> queues,
Set<String> users, Set<String> applicationTypes,
EnumSet<YarnApplicationState> applicationStates) throws YarnException,
IOException;
/**
* <p>
* Get metrics ({@link YarnClusterMetrics}) about the cluster.
@ -426,7 +452,7 @@ public abstract class YarnClient extends AbstractService {
* Get a report of all (ApplicationAttempts) of Application in the cluster.
* </p>
*
* @param applicationId
* @param applicationId application id of the app
* @return a list of reports for all application attempts for specified
* application.
* @throws YarnException
@ -460,7 +486,7 @@ public abstract class YarnClient extends AbstractService {
* Get a report of all (Containers) of ApplicationAttempt in the cluster.
* </p>
*
* @param applicationAttemptId
* @param applicationAttemptId application attempt id
* @return a list of reports of all containers for specified application
* attempts
* @throws YarnException

View File

@ -484,6 +484,19 @@ public class YarnClientImpl extends YarnClient {
return response.getApplicationList();
}
@Override
public List<ApplicationReport> getApplications(Set<String> queues,
Set<String> users, Set<String> applicationTypes,
EnumSet<YarnApplicationState> applicationStates) throws YarnException,
IOException {
GetApplicationsRequest request =
GetApplicationsRequest.newInstance(applicationTypes, applicationStates);
request.setQueues(queues);
request.setUsers(users);
GetApplicationsResponse response = rmClient.getApplications(request);
return response.getApplicationList();
}
@Override
public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
IOException {

View File

@ -685,7 +685,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
createFakeAppReports(), QueueState.RUNNING, null, null);
createFakeAppReports(), QueueState.RUNNING, null, null, null);
}
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {

View File

@ -1268,7 +1268,7 @@ public class TestYarnCLI {
nodeLabels.add("GPU");
nodeLabels.add("JDK_7");
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU");
null, null, QueueState.RUNNING, nodeLabels, "GPU", null);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1292,7 +1292,7 @@ public class TestYarnCLI {
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
QueueCLI cli = createAndGetQueueCLI();
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, null, null);
null, null, QueueState.RUNNING, null, null, null);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);

View File

@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueStatisticsProto;
import com.google.protobuf.TextFormat;
@ -368,4 +370,29 @@ public class QueueInfoPBImpl extends QueueInfo {
}
builder.setDefaultNodeLabelExpression(defaultNodeLabelExpression);
}
private QueueStatistics convertFromProtoFormat(QueueStatisticsProto q) {
return new QueueStatisticsPBImpl(q);
}
private QueueStatisticsProto convertToProtoFormat(QueueStatistics q) {
return ((QueueStatisticsPBImpl) q).getProto();
}
@Override
public QueueStatistics getQueueStatistics() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasQueueStatistics()) ? convertFromProtoFormat(p
.getQueueStatistics()) : null;
}
@Override
public void setQueueStatistics(QueueStatistics queueStatistics) {
maybeInitBuilder();
if (queueStatistics == null) {
builder.clearQueueStatistics();
return;
}
builder.setQueueStatistics(convertToProtoFormat(queueStatistics));
}
}

View File

@ -0,0 +1,257 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueStatisticsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueStatisticsProtoOrBuilder;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class QueueStatisticsPBImpl extends QueueStatistics {
QueueStatisticsProto proto = QueueStatisticsProto.getDefaultInstance();
QueueStatisticsProto.Builder builder = null;
boolean viaProto = false;
public QueueStatisticsPBImpl() {
builder = QueueStatisticsProto.newBuilder();
}
public QueueStatisticsPBImpl(QueueStatisticsProto proto) {
this.proto = proto;
viaProto = true;
}
public QueueStatisticsProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = QueueStatisticsProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public long getNumAppsSubmitted() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNumAppsSubmitted()) ? p.getNumAppsSubmitted() : -1;
}
@Override
public void setNumAppsSubmitted(long numAppsSubmitted) {
maybeInitBuilder();
builder.setNumAppsSubmitted(numAppsSubmitted);
}
@Override
public long getNumAppsRunning() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNumAppsRunning()) ? p.getNumAppsRunning() : -1;
}
@Override
public void setNumAppsRunning(long numAppsRunning) {
maybeInitBuilder();
builder.setNumAppsRunning(numAppsRunning);
}
@Override
public long getNumAppsPending() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNumAppsPending()) ? p.getNumAppsPending() : -1;
}
@Override
public void setNumAppsPending(long numAppsPending) {
maybeInitBuilder();
builder.setNumAppsPending(numAppsPending);
}
@Override
public long getNumAppsCompleted() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNumAppsCompleted()) ? p.getNumAppsCompleted() : -1;
}
@Override
public void setNumAppsCompleted(long numAppsCompleted) {
maybeInitBuilder();
builder.setNumAppsCompleted(numAppsCompleted);
}
@Override
public long getNumAppsKilled() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNumAppsKilled()) ? p.getNumAppsKilled() : -1;
}
@Override
public void setNumAppsKilled(long numAppsKilled) {
maybeInitBuilder();
builder.setNumAppsKilled(numAppsKilled);
}
@Override
public long getNumAppsFailed() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNumAppsFailed()) ? p.getNumAppsFailed() : -1;
}
@Override
public void setNumAppsFailed(long numAppsFailed) {
maybeInitBuilder();
builder.setNumAppsFailed(numAppsFailed);
}
@Override
public long getNumActiveUsers() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasNumActiveUsers()) ? p.getNumActiveUsers() : -1;
}
@Override
public void setNumActiveUsers(long numActiveUsers) {
maybeInitBuilder();
builder.setNumActiveUsers(numActiveUsers);
}
@Override
public long getAvailableMemoryMB() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasAvailableMemoryMB()) ? p.getAvailableMemoryMB() : -1;
}
@Override
public void setAvailableMemoryMB(long availableMemoryMB) {
maybeInitBuilder();
builder.setAvailableMemoryMB(availableMemoryMB);
}
@Override
public long getAllocatedMemoryMB() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasAllocatedMemoryMB()) ? p.getAllocatedMemoryMB() : -1;
}
@Override
public void setAllocatedMemoryMB(long allocatedMemoryMB) {
maybeInitBuilder();
builder.setAllocatedMemoryMB(allocatedMemoryMB);
}
@Override
public long getPendingMemoryMB() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasPendingMemoryMB()) ? p.getPendingMemoryMB() : -1;
}
@Override
public void setPendingMemoryMB(long pendingMemoryMB) {
maybeInitBuilder();
builder.setPendingMemoryMB(pendingMemoryMB);
}
@Override
public long getReservedMemoryMB() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasReservedMemoryMB()) ? p.getReservedMemoryMB() : -1;
}
@Override
public void setReservedMemoryMB(long reservedMemoryMB) {
maybeInitBuilder();
builder.setReservedMemoryMB(reservedMemoryMB);
}
@Override
public long getAvailableVCores() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasAvailableVCores()) ? p.getAvailableVCores() : -1;
}
@Override
public void setAvailableVCores(long availableVCores) {
maybeInitBuilder();
builder.setAvailableVCores(availableVCores);
}
@Override
public long getAllocatedVCores() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasAllocatedVCores()) ? p.getAllocatedVCores() : -1;
}
@Override
public void setAllocatedVCores(long allocatedVCores) {
maybeInitBuilder();
builder.setAllocatedVCores(allocatedVCores);
}
@Override
public long getPendingVCores() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasPendingVCores()) ? p.getPendingVCores() : -1;
}
@Override
public void setPendingVCores(long pendingVCores) {
maybeInitBuilder();
builder.setPendingVCores(pendingVCores);
}
@Override
public long getReservedVCores() {
QueueStatisticsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasReservedVCores()) ? p.getReservedVCores() : -1;
}
@Override
public void setReservedVCores(long reservedVCores) {
maybeInitBuilder();
builder.setReservedVCores(reservedVCores);
}
}

View File

@ -89,6 +89,80 @@ public class YarnClusterMetricsPBImpl extends YarnClusterMetrics {
builder.setNumNodeManagers((numNodeManagers));
}
@Override
public int getNumDecommissionedNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumDecommissionedNms()) {
return (p.getNumDecommissionedNms());
}
return 0;
}
@Override
public void
setNumDecommissionedNodeManagers(int numDecommissionedNodeManagers) {
maybeInitBuilder();
builder.setNumDecommissionedNms((numDecommissionedNodeManagers));
}
@Override
public int getNumActiveNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumActiveNms()) {
return (p.getNumActiveNms());
}
return 0;
}
@Override
public void setNumActiveNodeManagers(int numActiveNodeManagers) {
maybeInitBuilder();
builder.setNumActiveNms((numActiveNodeManagers));
}
@Override
public int getNumLostNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumLostNms()) {
return (p.getNumLostNms());
}
return 0;
}
@Override
public void setNumLostNodeManagers(int numLostNodeManagers) {
maybeInitBuilder();
builder.setNumLostNms((numLostNodeManagers));
}
@Override
public int getNumUnhealthyNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumUnhealthyNms()) {
return (p.getNumUnhealthyNms());
}
return 0;
}
@Override
public void setNumUnhealthyNodeManagers(int numUnhealthyNodeManagers) {
maybeInitBuilder();
builder.setNumUnhealthyNms((numUnhealthyNodeManagers));
}
@Override
public int getNumRebootedNodeManagers() {
YarnClusterMetricsProtoOrBuilder p = viaProto ? proto : builder;
if (p.hasNumRebootedNms()) {
return (p.getNumRebootedNms());
}
return 0;
}
@Override
public void setNumRebootedNodeManagers(int numRebootedNodeManagers) {
maybeInitBuilder();
builder.setNumRebootedNms((numRebootedNodeManagers));
}
}

View File

@ -130,6 +130,7 @@ import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -482,7 +483,8 @@ public class TestPBImplRecords {
// it is recursive(has sub queues)
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
"x && y"));
"x && y", null));
generateByNewInstance(QueueStatistics.class);
generateByNewInstance(QueueUserACLInfo.class);
generateByNewInstance(YarnClusterMetrics.class);
// for reservation system

View File

@ -659,6 +659,13 @@ public class ClientRMService extends AbstractService implements
YarnClusterMetrics ymetrics = recordFactory
.newRecordInstance(YarnClusterMetrics.class);
ymetrics.setNumNodeManagers(this.rmContext.getRMNodes().size());
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ymetrics.setNumDecommissionedNodeManagers(clusterMetrics
.getNumDecommisionedNMs());
ymetrics.setNumActiveNodeManagers(clusterMetrics.getNumActiveNMs());
ymetrics.setNumLostNodeManagers(clusterMetrics.getNumLostNMs());
ymetrics.setNumUnhealthyNodeManagers(clusterMetrics.getUnhealthyNMs());
ymetrics.setNumRebootedNodeManagers(clusterMetrics.getNumRebootedNMs());
response.setClusterMetrics(ymetrics);
return response;
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -306,9 +307,31 @@ public abstract class AbstractCSQueue implements CSQueue {
queueInfo.setQueueState(state);
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics());
return queueInfo;
}
public QueueStatistics getQueueStatistics() {
QueueStatistics stats =
recordFactory.newRecordInstance(QueueStatistics.class);
stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted());
stats.setNumAppsRunning(getMetrics().getAppsRunning());
stats.setNumAppsPending(getMetrics().getAppsPending());
stats.setNumAppsCompleted(getMetrics().getAppsCompleted());
stats.setNumAppsKilled(getMetrics().getAppsKilled());
stats.setNumAppsFailed(getMetrics().getAppsFailed());
stats.setNumActiveUsers(getMetrics().getActiveUsers());
stats.setAvailableMemoryMB(getMetrics().getAvailableMB());
stats.setAllocatedMemoryMB(getMetrics().getAllocatedMB());
stats.setPendingMemoryMB(getMetrics().getPendingMB());
stats.setReservedMemoryMB(getMetrics().getReservedMB());
stats.setAvailableVCores(getMetrics().getAvailableVirtualCores());
stats.setAllocatedVCores(getMetrics().getAllocatedVirtualCores());
stats.setPendingVCores(getMetrics().getPendingVirtualCores());
stats.setReservedVCores(getMetrics().getReservedVirtualCores());
return stats;
}
@Private
public synchronized Resource getMaximumAllocation() {
return maximumAllocation;

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -152,9 +153,31 @@ public abstract class FSQueue implements Queue, Schedulable {
}
queueInfo.setChildQueues(childQueueInfos);
queueInfo.setQueueState(QueueState.RUNNING);
queueInfo.setQueueStatistics(getQueueStatistics());
return queueInfo;
}
public QueueStatistics getQueueStatistics() {
QueueStatistics stats =
recordFactory.newRecordInstance(QueueStatistics.class);
stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted());
stats.setNumAppsRunning(getMetrics().getAppsRunning());
stats.setNumAppsPending(getMetrics().getAppsPending());
stats.setNumAppsCompleted(getMetrics().getAppsCompleted());
stats.setNumAppsKilled(getMetrics().getAppsKilled());
stats.setNumAppsFailed(getMetrics().getAppsFailed());
stats.setNumActiveUsers(getMetrics().getActiveUsers());
stats.setAvailableMemoryMB(getMetrics().getAvailableMB());
stats.setAllocatedMemoryMB(getMetrics().getAllocatedMB());
stats.setPendingMemoryMB(getMetrics().getPendingMB());
stats.setReservedMemoryMB(getMetrics().getReservedMB());
stats.setAvailableVCores(getMetrics().getAvailableVirtualCores());
stats.setAllocatedVCores(getMetrics().getAllocatedVirtualCores());
stats.setPendingVCores(getMetrics().getPendingVirtualCores());
stats.setReservedVCores(getMetrics().getReservedVirtualCores());
return stats;
}
@Override
public FSQueueMetrics getMetrics() {
return metrics;