YARN-3334. NM uses timeline client to publish container metrics to new timeline service. Contributed by Junping Du.

This commit is contained in:
Zhijie Shen 2015-04-06 09:31:24 -07:00 committed by Sangjin Lee
parent 42e49399ce
commit 5712b8f9fd
29 changed files with 344 additions and 115 deletions

View File

@ -58,7 +58,9 @@ public abstract class HierarchicalTimelineEntity extends TimelineEntity {
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "children")
// comment out XmlElement here because it cause UnrecognizedPropertyException
// TODO we need a better fix
//@XmlElement(name = "children")
public HashMap<String, Set<String>> getChildrenJAXB() {
return children;
}

View File

@ -2750,6 +2750,14 @@ public class YarnConfiguration extends Configuration {
return clusterId;
}
public static boolean systemMetricsPublisherEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
&& conf.getBoolean(
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
}
/* For debugging. mp configurations to system output as XML format. */
public static void main(String[] args) throws Exception {
new YarnConfiguration(new Configuration()).writeXml(System.out);

View File

@ -332,18 +332,7 @@ public class ApplicationMaster {
appMaster.run();
result = appMaster.finish();
threadPool.shutdown();
while (!threadPool.isTerminated()) { // wait for all posting thread to finish
try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
threadPool.shutdownNow(); // send interrupt to hurry them along
}
} catch (InterruptedException e) {
LOG.warn("Timeline client service stop interrupted!");
break;
}
}
shutdownAndAwaitTermination();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
LogManager.shutdown();
@ -358,6 +347,23 @@ public class ApplicationMaster {
}
}
//TODO remove threadPool after adding non-blocking call in TimelineClient
private static void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
LOG.error("ThreadPool did not terminate");
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
/**
* Dump out contents of $CWD and the environment to stdout for debugging
*/

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -82,7 +83,10 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -140,6 +144,16 @@ public class TestDistributedShell {
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set("mapreduce.jobhistory.address",
"0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
// Enable ContainersMonitorImpl
conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class.getName());
conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
ProcfsBasedProcessTree.class.getName());
conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
true);
conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
// ATS version specific settings
if (timelineVersion == 1.0f) {
@ -470,15 +484,14 @@ public class TestDistributedShell {
File tmpRootFolder = new File(tmpRoot);
try {
Assert.assertTrue(tmpRootFolder.isDirectory());
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
String outputDirApp = tmpRoot +
String basePath = tmpRoot +
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
UserGroupInformation.getCurrentUser().getShortUserName() +
(defaultFlow ? "/" +
TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
"/0/" : "/test_flow_id/12345678/") +
appId.toString() + "/DS_APP_ATTEMPT/";
"/0/" : "/test_flow_id/12345678/") + appId.toString();
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
String outputDirApp = basePath + "/DS_APP_ATTEMPT/";
File entityFolder = new File(outputDirApp);
Assert.assertTrue(entityFolder.isDirectory());
@ -491,13 +504,7 @@ public class TestDistributedShell {
File appAttemptFile = new File(appAttemptFileName);
Assert.assertTrue(appAttemptFile.exists());
String outputDirContainer = tmpRoot +
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
UserGroupInformation.getCurrentUser().getShortUserName() +
(defaultFlow ? "/" +
TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
"/0/" : "/test_flow_id/12345678/") +
appId.toString() + "/DS_CONTAINER/";
String outputDirContainer = basePath + "/DS_CONTAINER/";
File containerFolder = new File(outputDirContainer);
Assert.assertTrue(containerFolder.isDirectory());
@ -509,6 +516,22 @@ public class TestDistributedShell {
Assert.assertTrue(containerFile.exists());
String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+ "_";
// Verify NM posting container metrics info.
String outputDirContainerMetrics = basePath + "/" +
TimelineEntityType.YARN_CONTAINER + "/";
File containerMetricsFolder = new File(outputDirContainerMetrics);
Assert.assertTrue(containerMetricsFolder.isDirectory());
String containerMetricsTimestampFileName = "container_"
+ appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000001.thist";
String containerMetricsFileName = outputDirContainerMetrics +
containerMetricsTimestampFileName;
File containerMetricsFile = new File(containerMetricsFileName);
Assert.assertTrue(containerMetricsFile.exists());
} finally {
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
}

View File

@ -482,14 +482,11 @@ public class TimelineClientImpl extends TimelineClient {
}
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
String msg = "Response from the timeline server is " +
((resp == null) ? "null":
"not successful," + " HTTP error code: " + resp.getStatus()
+ ", Server response:\n" + resp.getEntity(String.class));
LOG.error(msg);
if (LOG.isDebugEnabled() && resp != null) {
String output = resp.getEntity(String.class);
LOG.debug("HTTP error code: " + resp.getStatus()
+ " Server response:\n" + output);
}
throw new YarnException(msg);
}
}

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -77,13 +78,6 @@ public interface Context {
*/
Map<ApplicationId, String> getRegisteredCollectors();
/**
* Return the known collectors which get from RM for all active applications
* running on this NM.
* @return known collectors.
*/
Map<ApplicationId, String> getKnownCollectors();
ConcurrentMap<ContainerId, Container> getContainers();
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
@ -107,6 +101,8 @@ public interface Context {
boolean getDecommissioned();
Configuration getConf();
void setDecommissioned(boolean isDecommissioned);
ConcurrentLinkedQueue<LogAggregationReport>

View File

@ -202,9 +202,10 @@ public class NodeManager extends CompositeService
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
NMStateStoreService stateStore, boolean isDistSchedulerEnabled) {
NMStateStoreService stateStore, boolean isDistSchedulerEnabled,
Configuration conf) {
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled);
dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled, conf);
}
protected void doSecureLogin() throws IOException {
@ -337,7 +338,7 @@ public class NodeManager extends CompositeService
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager, nmStore, isDistSchedulingEnabled);
nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);
nodeLabelsProvider = createNodeLabelsProvider(conf);
@ -466,6 +467,9 @@ public class NodeManager extends CompositeService
public static class NMContext implements Context {
private NodeId nodeId = null;
private Configuration conf = null;
protected final ConcurrentMap<ApplicationId, Application> applications =
new ConcurrentHashMap<ApplicationId, Application>();
@ -478,9 +482,6 @@ public class NodeManager extends CompositeService
protected Map<ApplicationId, String> registeredCollectors =
new ConcurrentHashMap<ApplicationId, String>();
protected Map<ApplicationId, String> knownCollectors =
new ConcurrentHashMap<ApplicationId, String>();
protected final ConcurrentMap<ContainerId,
org.apache.hadoop.yarn.api.records.Container> increasedContainers =
new ConcurrentHashMap<>();
@ -508,7 +509,8 @@ public class NodeManager extends CompositeService
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
NMStateStoreService stateStore, boolean isDistSchedulingEnabled) {
NMStateStoreService stateStore, boolean isDistSchedulingEnabled,
Configuration conf) {
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
@ -521,6 +523,7 @@ public class NodeManager extends CompositeService
LogAggregationReport>();
this.queuingContext = new QueuingNMContext();
this.isDistSchedulingEnabled = isDistSchedulingEnabled;
this.conf = conf;
}
/**
@ -541,6 +544,11 @@ public class NodeManager extends CompositeService
return this.applications;
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public ConcurrentMap<ContainerId, Container> getContainers() {
return this.containers;
@ -669,19 +677,6 @@ public class NodeManager extends CompositeService
public void addRegisteredCollectors(
Map<ApplicationId, String> newRegisteredCollectors) {
this.registeredCollectors.putAll(newRegisteredCollectors);
// Update to knownCollectors as well so it can immediately be consumed by
// this NM's TimelineClient.
this.knownCollectors.putAll(newRegisteredCollectors);
}
@Override
public Map<ApplicationId, String> getKnownCollectors() {
return this.knownCollectors;
}
public void addKnownCollectors(
Map<ApplicationId, String> knownCollectors) {
this.knownCollectors.putAll(knownCollectors);
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@ -906,10 +909,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
newResource.toString());
}
}
Map<ApplicationId, String> knownCollectors =
response.getAppCollectorsMap();
((NodeManager.NMContext)context).addKnownCollectors(knownCollectors);
if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
updateTimelineClientsAddress(response);
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
@ -938,6 +940,46 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
/**
* Caller should take care of sending non null nodelabels for both
* arguments
*
* @param nodeLabelsNew
* @param nodeLabelsOld
* @return if the New node labels are diff from the older one.
*/
private boolean areNodeLabelsUpdated(Set<NodeLabel> nodeLabelsNew,
Set<NodeLabel> nodeLabelsOld) {
if (nodeLabelsNew.size() != nodeLabelsOld.size()
|| !nodeLabelsOld.containsAll(nodeLabelsNew)) {
return true;
}
return false;
}
private void updateTimelineClientsAddress(
NodeHeartbeatResponse response) {
Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
response.getAppCollectorsMap().entrySet();
for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
ApplicationId appId = entry.getKey();
String collectorAddr = entry.getValue();
// Only handle applications running on local node.
// Not include apps with timeline collectors running in local
Application application = context.getApplications().get(appId);
if (application != null &&
!context.getRegisteredCollectors().containsKey(appId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sync a new collector address: " + collectorAddr +
" for application: " + appId + " from RM.");
}
TimelineClient client = application.getTimelineClient();
client.setTimelineServiceAddress(collectorAddr);
}
}
}
private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -103,7 +104,15 @@ public class NMCollectorService extends CompositeService implements
Map<ApplicationId, String> newCollectorsMap =
new HashMap<ApplicationId, String>();
for (AppCollectorsMap collector : newCollectorsList) {
newCollectorsMap.put(collector.getApplicationId(), collector.getCollectorAddr());
ApplicationId appId = collector.getApplicationId();
String collectorAddr = collector.getCollectorAddr();
newCollectorsMap.put(appId, collectorAddr);
// set registered collector address to TimelineClient.
if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
TimelineClient client =
context.getApplications().get(appId).getTimelineClient();
client.setTimelineServiceAddress(collectorAddr);
}
}
((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap);
}

View File

@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -39,4 +40,6 @@ public interface Application extends EventHandler<ApplicationEvent> {
String getFlowRunId();
TimelineClient getTimelineClient();
}

View File

@ -30,6 +30,7 @@ import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -39,6 +40,8 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@ -80,6 +83,7 @@ public class ApplicationImpl implements Application {
private final ReadLock readLock;
private final WriteLock writeLock;
private final Context context;
private TimelineClient timelineClient;
private static final Log LOG = LogFactory.getLog(ApplicationImpl.class);
@ -122,6 +126,17 @@ public class ApplicationImpl implements Application {
Context context) {
this(dispatcher, user, flowId, flowRunId, appId, credentials,
context, -1);
Configuration conf = context.getConf();
if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
createAndStartTimelienClient(conf);
}
}
private void createAndStartTimelienClient(Configuration conf) {
// create and start timeline client
this.timelineClient = TimelineClient.createTimelineClient(appId);
timelineClient.init(conf);
timelineClient.start();
}
@Override
@ -134,6 +149,11 @@ public class ApplicationImpl implements Application {
return appId;
}
@Override
public TimelineClient getTimelineClient() {
return timelineClient;
}
@Override
public ApplicationState getApplicationState() {
this.readLock.lock();
@ -507,7 +527,11 @@ public class ApplicationImpl implements Application {
// TODO check we remove related collectors info in failure cases
// (YARN-3038)
app.context.getRegisteredCollectors().remove(app.getAppId());
app.context.getKnownCollectors().remove(app.getAppId());
// stop timelineClient when application get finished.
TimelineClient timelineClient = app.getTimelineClient();
if (timelineClient != null) {
timelineClient.stop();
}
}
}

View File

@ -18,32 +18,43 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ContainersMonitorImpl extends AbstractService implements
ContainersMonitor {
@ -76,11 +87,25 @@ public class ContainersMonitorImpl extends AbstractService implements
private boolean vmemCheckEnabled;
private boolean containersMonitorEnabled;
private boolean publishContainerMetricsToTimelineService;
private long maxVCoresAllottedForContainers;
private static final long UNKNOWN_MEMORY_LIMIT = -1L;
private int nodeCpuPercentageForYARN;
// For posting entities in new timeline service in a non-blocking way
// TODO replace with event loop in TimelineClient.
private static ExecutorService threadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
@Private
public static enum ContainerMetric {
CPU, MEMORY
}
private ResourceUtilization containersUtilization;
// Tracks the aggregated allocation of the currently allocated containers
// when queuing of containers at the NMs is enabled.
@ -193,6 +218,18 @@ public class ContainersMonitorImpl extends AbstractService implements
1) + "). Thrashing might happen.");
}
}
publishContainerMetricsToTimelineService =
YarnConfiguration.systemMetricsPublisherEnabled(conf);
if (publishContainerMetricsToTimelineService) {
LOG.info("NodeManager has been configured to publish container " +
"metrics to Timeline Service V2.");
} else {
LOG.warn("NodeManager has not been configured to publish container " +
"metrics to Timeline Service V2.");
}
super.serviceInit(conf);
}
@ -235,9 +272,28 @@ public class ContainersMonitorImpl extends AbstractService implements
;
}
}
shutdownAndAwaitTermination();
super.serviceStop();
}
// TODO remove threadPool after adding non-blocking call in TimelineClient
private static void shutdownAndAwaitTermination() {
threadPool.shutdown();
try {
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
LOG.error("ThreadPool did not terminate");
}
} catch (InterruptedException ie) {
threadPool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
public static class ProcessTreeInfo {
private ContainerId containerId;
private String pid;
@ -413,6 +469,10 @@ public class ContainersMonitorImpl extends AbstractService implements
.entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
ContainerEntity entity = new ContainerEntity();
entity.setId(containerId.toString());
try {
String pId = ptInfo.getPID();
@ -427,7 +487,8 @@ public class ContainersMonitorImpl extends AbstractService implements
+ " for the first time");
ResourceCalculatorProcessTree pt =
ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);
ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
pId, processTreeClass, conf);
ptInfo.setPid(pId);
ptInfo.setProcessTree(pt);
@ -451,6 +512,8 @@ public class ContainersMonitorImpl extends AbstractService implements
pTree.updateProcessTree(); // update process-tree
long currentVmemUsage = pTree.getVirtualMemorySize();
long currentPmemUsage = pTree.getRssMemorySize();
long currentTime = System.currentTimeMillis();
// if machine has 6 cores and 3 are used,
// cpuUsagePercentPerCore should be 300% and
// cpuUsageTotalCoresPercentage should be 50%
@ -503,6 +566,26 @@ public class ContainersMonitorImpl extends AbstractService implements
((int)cpuUsagePercentPerCore, milliVcoresUsed);
}
if (publishContainerMetricsToTimelineService) {
// if currentPmemUsage data is available
if (currentPmemUsage !=
ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric memoryMetric = new TimelineMetric();
memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId);
memoryMetric.addTimeSeriesData(currentTime, currentPmemUsage);
entity.addMetric(memoryMetric);
}
// if cpuUsageTotalCoresPercentage data is available
if (cpuUsageTotalCoresPercentage !=
ResourceCalculatorProcessTree.UNAVAILABLE) {
TimelineMetric cpuMetric = new TimelineMetric();
cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
cpuMetric.addTimeSeriesData(currentTime,
cpuUsageTotalCoresPercentage);
entity.addMetric(cpuMetric);
}
}
boolean isMemoryOverLimit = false;
String msg = "";
int containerExitStatus = ContainerExitStatus.INVALID;
@ -557,10 +640,23 @@ public class ContainersMonitorImpl extends AbstractService implements
trackingContainers.remove(containerId);
LOG.info("Removed ProcessTree with root " + pId);
}
} catch (Exception e) {
// Log the exception and proceed to the next container.
LOG.warn("Uncaught exception in ContainerMemoryManager "
+ "while managing memory of " + containerId, e);
LOG.warn("Uncaught exception in ContainersMonitorImpl "
+ "while monitoring resource of " + containerId, e);
}
if (publishContainerMetricsToTimelineService) {
try {
TimelineClient timelineClient = context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId()).
getTimelineClient();
putEntityWithoutBlocking(timelineClient, entity);
} catch (Exception e) {
LOG.error("Exception in ContainersMonitorImpl in putting " +
"resource usage metrics to timeline service.", e);
}
}
}
if (LOG.isDebugEnabled()) {
@ -585,6 +681,21 @@ public class ContainersMonitorImpl extends AbstractService implements
}
}
private void putEntityWithoutBlocking(final TimelineClient timelineClient,
final TimelineEntity entity) {
Runnable publishWrapper = new Runnable() {
public void run() {
try {
timelineClient.putEntities(entity);
} catch (IOException|YarnException e) {
LOG.error("putEntityNonBlocking get failed: " + e);
throw new RuntimeException(e.toString());
}
}
};
threadPool.execute(publishWrapper);
}
private String formatErrorMessage(String memTypeExceeded,
long currentVmemUsage, long vmemLimit,
long currentPmemUsage, long pmemLimit,

View File

@ -81,7 +81,7 @@ public class TestEventFlow {
Context context = new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null, null,
new NMNullStateStoreService(), false) {
new NMNullStateStoreService(), false, conf) {
@Override
public int getHttpPort() {
return 1234;

View File

@ -1704,9 +1704,10 @@ public class TestNodeStatusUpdater {
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
NMStateStoreService store, boolean isDistributedSchedulingEnabled) {
NMStateStoreService store, boolean isDistributedSchedulingEnabled,
Configuration conf) {
return new MyNMContext(containerTokenSecretManager,
nmTokenSecretManager);
nmTokenSecretManager, conf);
}
};
@ -1937,9 +1938,9 @@ public class TestNodeStatusUpdater {
public MyNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
NMTokenSecretManagerInNM nmTokenSecretManager, Configuration conf) {
super(containerTokenSecretManager, nmTokenSecretManager, null, null,
new NMNullStateStoreService(), false);
new NMNullStateStoreService(), false, conf);
}
@Override

View File

@ -622,11 +622,6 @@ public abstract class BaseAMRMProxyTest {
return null;
}
@Override
public Map<ApplicationId, String> getKnownCollectors() {
return null;
}
@Override
public ConcurrentMap<ContainerId, Container> getContainers() {
return null;
@ -677,6 +672,11 @@ public abstract class BaseAMRMProxyTest {
return false;
}
@Override
public Configuration getConf() {
return null;
}
@Override
public void setDecommissioned(boolean isDecommissioned) {
}

View File

@ -119,7 +119,8 @@ public abstract class BaseContainerManagerTest {
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
conf) {
public int getHttpPort() {
return HTTP_PORT;
};

View File

@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.isA;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -559,7 +559,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
NMStateStoreService stateStore) {
NMContext context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore, false){
new ApplicationACLsManager(conf), stateStore, false, conf) {
public int getHttpPort() {
return HTTP_PORT;
}

View File

@ -113,7 +113,8 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
private static final String INVALID_JAVA_HOME = "/no/jvm/here";
protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
conf) {
public int getHttpPort() {
return HTTP_PORT;
};

View File

@ -82,7 +82,7 @@ public class TestLocalCacheDirectoryManager {
new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(),
false);
false, conf);
ResourceLocalizationService service =
new ResourceLocalizationService(null, null, null, null, nmContext);
try {

View File

@ -186,7 +186,8 @@ public class TestResourceLocalizationService {
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
nmContext = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
conf);
}
@After
@ -2369,7 +2370,7 @@ public class TestResourceLocalizationService {
NMContext nmContext =
new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore, false);
new ApplicationACLsManager(conf), stateStore, false, conf);
ResourceLocalizationService rawService =
new ResourceLocalizationService(dispatcher, exec, delService,
dirsHandler, nmContext);

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@ -41,7 +42,7 @@ public class MockApp implements Application {
Application app;
String flowId;
String flowRunId;
TimelineClient timelineClient = null;
public MockApp(int uniqId) {
this("mockUser", 1234, uniqId);
@ -87,4 +88,9 @@ public class MockApp implements Application {
public String getFlowRunId() {
return flowRunId;
}
@Override
public TimelineClient getTimelineClient() {
return timelineClient;
}
}

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.BufferedOutputStream;
import java.io.File;
@ -47,10 +47,9 @@ import org.apache.hadoop.util.NodeHealthScriptRunner;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@ -96,7 +96,8 @@ public class TestContainerLogsPage {
healthChecker.init(conf);
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
conf);
// Add an application and the corresponding containers
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
String user = "nobody";
@ -136,7 +137,8 @@ public class TestContainerLogsPage {
when(dirsHandlerForFullDisk.getLogDirsForRead()).
thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
conf);
nmContext.getApplications().put(appId, app);
container.setState(ContainerState.RUNNING);
nmContext.getContainers().put(container1, container);
@ -158,7 +160,8 @@ public class TestContainerLogsPage {
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
conf);
// Add an application and the corresponding containers
String user = "nobody";
long clusterTimeStamp = 1234;

View File

@ -63,7 +63,7 @@ public class TestNMAppsPage {
final NMContext nmcontext = new NMContext(
new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(),
null, new ApplicationACLsManager(conf), new NMNullStateStoreService(),
false);
false, conf);
Injector injector = WebAppTests.createMockInjector(NMContext.class,
nmcontext, new Module() {
@Override

View File

@ -86,8 +86,9 @@ public class TestNMWebServer {
}
private int startNMWebAppServer(String webAddr) {
Configuration conf = new Configuration();
Context nmContext = new NodeManager.NMContext(null, null, null, null,
null, false);
null, false, conf);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -110,7 +111,7 @@ public class TestNMWebServer {
return true;
}
};
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
@ -149,8 +150,9 @@ public class TestNMWebServer {
@Test
public void testNMWebApp() throws IOException, YarnException {
Configuration conf = new Configuration();
Context nmContext = new NodeManager.NMContext(null, null, null, null,
null, false);
null, false, conf);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -173,7 +175,7 @@ public class TestNMWebServer {
return true;
}
};
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);

View File

@ -108,7 +108,7 @@ public class TestNMWebServices extends JerseyTestBase {
healthChecker.init(conf);
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
aclsManager, null, false);
aclsManager, null, false, conf);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {

View File

@ -102,7 +102,7 @@ public class TestNMWebServicesApps extends JerseyTestBase {
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
aclsManager, null, false);
aclsManager, null, false, conf);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {

View File

@ -136,7 +136,7 @@ public class TestNMWebServicesContainers extends JerseyTestBase {
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
aclsManager, null, false) {
aclsManager, null, false, conf) {
public NodeId getNodeId() {
return NodeId.newInstance("testhost.foo.com", 8042);
};

View File

@ -596,18 +596,16 @@ public class ResourceTrackerService extends AbstractService implements
Map<ApplicationId, String> liveAppCollectorsMap = new
ConcurrentHashMap<ApplicationId, String>();
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
for (ApplicationId appId : liveApps) {
String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
if (appCollectorAddr != null) {
liveAppCollectorsMap.put(appId, appCollectorAddr);
} else {
// Log a debug info if collector address is not found.
if (LOG.isDebugEnabled()) {
LOG.debug("Collector for applicaton: " + appId +
" hasn't registered yet!");
}
}
// Set collectors for all apps now.
// TODO set collectors for only active apps running on NM (liveApps cannot be
// used for this case)
for (Map.Entry<ApplicationId, RMApp> rmApp : rmApps.entrySet()) {
ApplicationId appId = rmApp.getKey();
String appCollectorAddr = rmApp.getValue().getCollectorAddr();
if (appCollectorAddr != null) {
liveAppCollectorsMap.put(appId, appCollectorAddr);
}
}
response.setAppCollectorsMap(liveAppCollectorsMap);
}

View File

@ -135,7 +135,7 @@ public class TimelineCollectorWebService {
}
TimelineCollector collector = getCollector(req, appId);
if (collector == null) {
LOG.error("Application not found");
LOG.error("Application: "+ appId + " is not found");
throw new NotFoundException(); // different exception?
}
collector.putEntities(entities, callerUgi);