YARN-4712. CPU Usage Metric is not captured properly in YARN-2928. (Naganarasimha G R via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-03-18 23:19:18 +05:30 committed by Sangjin Lee
parent 9bdd455dce
commit 6f6cc647d6
3 changed files with 171 additions and 16 deletions

View File

@ -22,7 +22,6 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -32,11 +31,10 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -47,6 +45,7 @@
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
public class ContainersMonitorImpl extends AbstractService implements
@ -576,8 +575,8 @@ && isProcessTreeOverLimit(containerId.toString(),
NMTimelinePublisher nmMetricsPublisher =
container.getNMTimelinePublisher();
if (nmMetricsPublisher != null) {
nmMetricsPublisher.reportContainerResourceUsage(container, pId,
currentPmemUsage, cpuUsageTotalCoresPercentage);
nmMetricsPublisher.reportContainerResourceUsage(container,
currentPmemUsage, cpuUsagePercentPerCore);
}
} catch (Exception e) {
// Log the exception and proceed to the next container.

View File

@ -113,29 +113,28 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) {
}
@SuppressWarnings("unchecked")
public void reportContainerResourceUsage(Container container, String pId,
Long pmemUsage, Float cpuUsageTotalCoresPercentage) {
public void reportContainerResourceUsage(Container container, Long pmemUsage,
Float cpuUsagePercentPerCore) {
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
cpuUsageTotalCoresPercentage !=
ResourceCalculatorProcessTree.UNAVAILABLE) {
cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
ContainerEntity entity =
createContainerEntity(container.getContainerId());
long currentTimeMillis = System.currentTimeMillis();
if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric memoryMetric = new TimelineMetric();
memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId);
memoryMetric.setId(ContainerMetric.MEMORY.toString());
memoryMetric.addValue(currentTimeMillis, pmemUsage);
entity.addMetric(memoryMetric);
}
if (cpuUsageTotalCoresPercentage !=
ResourceCalculatorProcessTree.UNAVAILABLE) {
if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric cpuMetric = new TimelineMetric();
cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
cpuMetric.addValue(currentTimeMillis, cpuUsageTotalCoresPercentage);
cpuMetric.setId(ContainerMetric.CPU.toString());
cpuMetric.addValue(currentTimeMillis,
Math.round(cpuUsagePercentPerCore));
entity.addMetric(cpuMetric);
}
dispatcher.getEventHandler().handle(
new TimelinePublishEvent(entity, container.getContainerId()
dispatcher.getEventHandler()
.handle(new TimelinePublishEvent(entity, container.getContainerId()
.getApplicationAttemptId().getApplicationId()));
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.junit.Assert;
import org.junit.Test;
public class TestNMTimelinePublisher {
private static final String MEMORY_ID = "MEMORY";
private static final String CPU_ID = "CPU";
@Test
public void testContainerResourceUsage() {
Context context = mock(Context.class);
@SuppressWarnings("unchecked")
ConcurrentMap<ApplicationId, Application> map = mock(ConcurrentMap.class);
Application aApp = mock(Application.class);
when(map.get(any(ApplicationId.class))).thenReturn(aApp);
DummyTimelineClient timelineClient = new DummyTimelineClient();
when(aApp.getTimelineClient()).thenReturn(timelineClient);
when(context.getApplications()).thenReturn(map);
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
when(context.getHttpPort()).thenReturn(0);
NMTimelinePublisher publisher = new NMTimelinePublisher(context);
publisher.init(new Configuration());
publisher.start();
Container aContainer = mock(Container.class);
when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
0L));
publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
timelineClient.reset();
publisher.reportContainerResourceUsage(aContainer, 1024L, 0.8F);
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 1);
timelineClient.reset();
publisher.reportContainerResourceUsage(aContainer, 1024L, 0.49F);
verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 0);
timelineClient.reset();
publisher.reportContainerResourceUsage(aContainer, 1024L,
(float) ResourceCalculatorProcessTree.UNAVAILABLE);
verifyPublishedResourceUsageMetrics(timelineClient, 1024L,
ResourceCalculatorProcessTree.UNAVAILABLE);
publisher.stop();
}
private void verifyPublishedResourceUsageMetrics(
DummyTimelineClient timelineClient, long memoryUsage, int cpuUsage) {
TimelineEntity[] entities = null;
for (int i = 0; i < 10; i++) {
entities = timelineClient.getLastPublishedEntities();
if (entities != null) {
break;
}
try {
Thread.sleep(150L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
int numberOfResourceMetrics = 0;
numberOfResourceMetrics +=
(memoryUsage == ResourceCalculatorProcessTree.UNAVAILABLE) ? 0 : 1;
numberOfResourceMetrics +=
(cpuUsage == ResourceCalculatorProcessTree.UNAVAILABLE) ? 0 : 1;
assertNotNull("entities are expected to be published", entities);
assertEquals("Expected number of metrics notpublished",
numberOfResourceMetrics, entities[0].getMetrics().size());
Iterator<TimelineMetric> metrics = entities[0].getMetrics().iterator();
while (metrics.hasNext()) {
TimelineMetric metric = metrics.next();
Iterator<Entry<Long, Number>> entrySet;
switch (metric.getId()) {
case CPU_ID:
if (cpuUsage == ResourceCalculatorProcessTree.UNAVAILABLE) {
Assert.fail("Not Expecting CPU Metric to be published");
}
entrySet = metric.getValues().entrySet().iterator();
assertEquals("CPU usage metric not matching", cpuUsage,
entrySet.next().getValue());
break;
case MEMORY_ID:
if (memoryUsage == ResourceCalculatorProcessTree.UNAVAILABLE) {
Assert.fail("Not Expecting Memory Metric to be published");
}
entrySet = metric.getValues().entrySet().iterator();
assertEquals("Memory usage metric not matching", memoryUsage,
entrySet.next().getValue());
break;
default:
Assert.fail("Invalid Resource Usage metric");
break;
}
}
}
protected static class DummyTimelineClient extends TimelineClientImpl {
private TimelineEntity[] lastPublishedEntities;
@Override
public void putEntities(TimelineEntity... entities)
throws IOException, YarnException {
this.lastPublishedEntities = entities;
}
public TimelineEntity[] getLastPublishedEntities() {
return lastPublishedEntities;
}
public void reset() {
lastPublishedEntities = null;
}
}
}