diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d1960e611f0..19f08544bbb 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -131,6 +131,9 @@ Release 2.8.0 - UNRELEASED YARN-41. The RM should handle the graceful shutdown of the NM. (Devaraj K via junping_du) + YARN-1012. Report NM aggregated container resource utilization in heartbeat. + (Inigo Goiri via kasha) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 8167a58f8e6..c51570c7e73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -68,7 +68,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{ failoverThread = createAndStartFailoverThread(); NodeStatus status = NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, - null, null); + null, null, null); NodeHeartbeatRequest request2 = NodeHeartbeatRequest.newInstance(status, null, null,null); resourceTracker.nodeHeartbeat(request2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index aad819d62e8..38b03816957 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -19,24 +19,48 @@ package org.apache.hadoop.yarn.server.api.records; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; - +/** + * {@code NodeStatus} is a summary of the status of the node. + *

+ * It includes information such as: + *

+ */ public abstract class NodeStatus { - + + /** + * Create a new {@code NodeStatus}. + * @param nodeId Identifier for this node. + * @param responseId Identifier for the response. + * @param containerStatuses Status of the containers running in this node. + * @param keepAliveApplications Applications to keep alive. + * @param nodeHealthStatus Health status of the node. + * @param containersUtilizations Utilization of the containers in this node. + * @return New {@code NodeStatus} with the provided information. + */ public static NodeStatus newInstance(NodeId nodeId, int responseId, List containerStatuses, List keepAliveApplications, - NodeHealthStatus nodeHealthStatus) { + NodeHealthStatus nodeHealthStatus, + ResourceUtilization containersUtilization) { NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); nodeStatus.setResponseId(responseId); nodeStatus.setNodeId(nodeId); nodeStatus.setContainersStatuses(containerStatuses); nodeStatus.setKeepAliveApplications(keepAliveApplications); nodeStatus.setNodeHealthStatus(nodeHealthStatus); + nodeStatus.setContainersUtilization(containersUtilization); return nodeStatus; } @@ -55,4 +79,17 @@ public abstract class NodeStatus { public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); + + /** + * Get the resource utilization of the containers. + * @return resource utilization of the containers + */ + @Public + @Stable + public abstract ResourceUtilization getContainersUtilization(); + + @Private + @Unstable + public abstract void setContainersUtilization( + ResourceUtilization containersUtilization); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceUtilization.java new file mode 100644 index 00000000000..39896a311e3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceUtilization.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * ResourceUtilization models the utilization of a set of computer + * resources in the cluster. + *

+ */ +@Private +@Evolving +public abstract class ResourceUtilization implements + Comparable { + public static ResourceUtilization newInstance(int pmem, int vmem, float cpu) { + ResourceUtilization utilization = + Records.newRecord(ResourceUtilization.class); + utilization.setPhysicalMemory(pmem); + utilization.setVirtualMemory(vmem); + utilization.setCPU(cpu); + return utilization; + } + + /** + * Get used virtual memory. + * + * @return virtual memory in MB + */ + public abstract int getVirtualMemory(); + + /** + * Set used virtual memory. + * + * @param vmem virtual memory in MB + */ + public abstract void setVirtualMemory(int vmem); + + /** + * Get physical memory. + * + * @return physical memory in MB + */ + public abstract int getPhysicalMemory(); + + /** + * Set physical memory. + * + * @param pmem physical memory in MB + */ + public abstract void setPhysicalMemory(int pmem); + + /** + * Get CPU utilization. + * + * @return CPU utilization normalized to 1 CPU + */ + public abstract float getCPU(); + + /** + * Set CPU utilization. + * + * @param cpu CPU utilization normalized to 1 CPU + */ + public abstract void setCPU(float cpu); + + @Override + public int hashCode() { + final int prime = 263167; + int result = 3571; + result = prime * result + getVirtualMemory(); + result = prime * result + getPhysicalMemory(); + result = 31 * result + Float.valueOf(getCPU()).hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ResourceUtilization)) { + return false; + } + ResourceUtilization other = (ResourceUtilization) obj; + if (getVirtualMemory() != other.getVirtualMemory() + || getPhysicalMemory() != other.getPhysicalMemory() + || getCPU() != other.getCPU()) { + return false; + } + return true; + } + + @Override + public String toString() { + return ""; + } + + /** + * Add utilization to the current one. + * @param pmem Physical memory used to add. + * @param vmem Virtual memory used to add. + * @param cpu CPU utilization to add. + */ + public void addTo(int pmem, int vmem, float cpu) { + this.setPhysicalMemory(this.getPhysicalMemory() + pmem); + this.setVirtualMemory(this.getVirtualMemory() + vmem); + this.setCPU(this.getCPU() + cpu); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 65376dc659e..fffd6a9736c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -35,9 +35,10 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceUtilizationProto; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; - +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; public class NodeStatusPBImpl extends NodeStatus { NodeStatusProto proto = NodeStatusProto.getDefaultInstance(); @@ -291,6 +292,28 @@ public class NodeStatusPBImpl extends NodeStatus { this.nodeHealthStatus = healthStatus; } + @Override + public ResourceUtilization getContainersUtilization() { + NodeStatusProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + if (!p.hasContainersUtilization()) { + return null; + } + return convertFromProtoFormat(p.getContainersUtilization()); + } + + @Override + public void setContainersUtilization( + ResourceUtilization containersUtilization) { + maybeInitBuilder(); + if (containersUtilization == null) { + this.builder.clearContainersUtilization(); + return; + } + this.builder + .setContainersUtilization(convertToProtoFormat(containersUtilization)); + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -323,4 +346,13 @@ public class NodeStatusPBImpl extends NodeStatus { private ApplicationIdProto convertToProtoFormat(ApplicationId c) { return ((ApplicationIdPBImpl)c).getProto(); } + + private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) { + return ((ResourceUtilizationPBImpl) r).getProto(); + } + + private ResourceUtilizationPBImpl convertFromProtoFormat( + ResourceUtilizationProto p) { + return new ResourceUtilizationPBImpl(p); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java new file mode 100644 index 00000000000..01cda7ae882 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceUtilizationPBImpl.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceUtilizationProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceUtilizationProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; + +@Private +@Unstable +public class ResourceUtilizationPBImpl extends ResourceUtilization { + private ResourceUtilizationProto proto = ResourceUtilizationProto + .getDefaultInstance(); + private ResourceUtilizationProto.Builder builder = null; + private boolean viaProto = false; + + public ResourceUtilizationPBImpl() { + builder = ResourceUtilizationProto.newBuilder(); + } + + public ResourceUtilizationPBImpl(ResourceUtilizationProto proto) { + this.proto = proto; + viaProto = true; + } + + public ResourceUtilizationProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ResourceUtilizationProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getPhysicalMemory() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return (p.getPmem()); + } + + @Override + public void setPhysicalMemory(int pmem) { + maybeInitBuilder(); + builder.setPmem(pmem); + } + + @Override + public int getVirtualMemory() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return (p.getVmem()); + } + + @Override + public void setVirtualMemory(int vmem) { + maybeInitBuilder(); + builder.setPmem(vmem); + } + + @Override + public float getCPU() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return p.getCpu(); + } + + @Override + public void setCPU(float cpu) { + maybeInitBuilder(); + builder.setCpu(cpu); + } + + @Override + public int compareTo(ResourceUtilization other) { + int diff = this.getPhysicalMemory() - other.getPhysicalMemory(); + if (diff == 0) { + diff = this.getVirtualMemory() - other.getVirtualMemory(); + if (diff == 0) { + diff = Float.compare(this.getCPU(), other.getCPU()); + } + } + return diff; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/package-info.java new file mode 100644 index 00000000000..bf8497f683b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Server records. */ +package org.apache.hadoop.yarn.server.api.records; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 99149ac15e3..a810813a8f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -36,6 +36,7 @@ message NodeStatusProto { repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; repeated ApplicationIdProto keep_alive_applications = 5; + optional ResourceUtilizationProto containers_utilization = 6; } message MasterKeyProto { @@ -52,4 +53,10 @@ message NodeHealthStatusProto { message VersionProto { optional int32 major_version = 1; optional int32 minor_version = 2; +} + +message ResourceUtilizationProto { + optional int32 pmem = 1; + optional int32 vmem = 2; + optional float cpu = 3; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3721b0e8a12..30a2bd50da9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -73,13 +73,14 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; -import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -429,13 +430,27 @@ public class NodeStatusUpdaterImpl extends AbstractService implements + ", " + nodeHealthStatus.getHealthReport()); } List containersStatuses = getContainerStatuses(); + ResourceUtilization containersUtilization = getContainersUtilization(); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, - createKeepAliveApplicationList(), nodeHealthStatus); + createKeepAliveApplicationList(), nodeHealthStatus, + containersUtilization); return nodeStatus; } + /** + * Get the aggregated utilization of the containers in this node. + * @return Resource utilization of all the containers. + */ + private ResourceUtilization getContainersUtilization() { + ContainerManagerImpl containerManager = + (ContainerManagerImpl) this.context.getContainerManager(); + ContainersMonitor containersMonitor = + containerManager.getContainersMonitor(); + return containersMonitor.getContainersUtilization(); + } + // Iterate through the NMContext and clone and get all the containers' // statuses. If it's a completed container, add into the // recentlyStoppedContainers collections. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index d3e2bf2c69f..f0dd2e70a42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -19,10 +19,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ResourceView; public interface ContainersMonitor extends Service, EventHandler, ResourceView { - + public ResourceUtilization getContainersUtilization(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index b5f154d26a5..57d1bad353c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; @@ -78,6 +79,8 @@ public class ContainersMonitorImpl extends AbstractService implements private static final long UNKNOWN_MEMORY_LIMIT = -1L; private int nodeCpuPercentageForYARN; + private ResourceUtilization containersUtilization; + public ContainersMonitorImpl(ContainerExecutor exec, AsyncDispatcher dispatcher, Context context) { super("containers-monitor"); @@ -89,6 +92,8 @@ public class ContainersMonitorImpl extends AbstractService implements this.containersToBeAdded = new HashMap(); this.containersToBeRemoved = new ArrayList(); this.monitoringThread = new MonitoringThread(); + + this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); } @Override @@ -384,6 +389,11 @@ public class ContainersMonitorImpl extends AbstractService implements containersToBeRemoved.clear(); } + // Temporary structure to calculate the total resource utilization of + // the containers + ResourceUtilization trackedContainersUtilization = + ResourceUtilization.newInstance(0, 0, 0.0f); + // Now do the monitoring for the trackingContainers // Check memory usage and kill any overflowing containers long vmemUsageByAllContainers = 0; @@ -463,6 +473,12 @@ public class ContainersMonitorImpl extends AbstractService implements currentPmemUsage, pmemLimit)); } + // Add resource utilization for this container + trackedContainersUtilization.addTo( + (int) (currentPmemUsage >> 20), + (int) (currentVmemUsage >> 20), + milliVcoresUsed / 1000.0f); + // Add usage to container metrics if (containerMetricsEnabled) { ContainerMetrics.forContainer( @@ -542,6 +558,9 @@ public class ContainersMonitorImpl extends AbstractService implements + cpuUsagePercentPerCoreByAllContainers); } + // Save the aggregated utilization of the containers + setContainersUtilization(trackedContainersUtilization); + try { Thread.sleep(monitoringInterval); } catch (InterruptedException e) { @@ -613,6 +632,15 @@ public class ContainersMonitorImpl extends AbstractService implements return this.vmemCheckEnabled; } + @Override + public ResourceUtilization getContainersUtilization() { + return this.containersUtilization; + } + + public void setContainersUtilization(ResourceUtilization utilization) { + this.containersUtilization = utilization; + } + @Override public void handle(ContainersMonitorEvent monitoringEvent) {