YARN-1690. Made DistributedShell send timeline entities+events. Contributed by Mayank Bansal.

svn merge --ignore-ancestry -c 1579123 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1579124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-03-19 03:51:27 +00:00
parent 65fd55768d
commit e39ccf3b24
5 changed files with 222 additions and 4 deletions

View File

@ -160,6 +160,9 @@ Release 2.4.0 - UNRELEASED
YARN-1705. Reset cluster-metrics on transition to standby. (Rohith via kasha)
YARN-1690. Made DistributedShell send timeline entities+events. (Mayank Bansal
via zjshen)
IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -45,6 +45,7 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -55,6 +56,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@ -80,7 +82,10 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@ -90,6 +95,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting;
@ -160,6 +166,18 @@ public class ApplicationMaster {
private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
@VisibleForTesting
@Private
public static enum DSEvent {
DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
}
@VisibleForTesting
@Private
public static enum DSEntity {
DS_APP_ATTEMPT, DS_CONTAINER
}
// Configuration
private Configuration conf;
@ -242,6 +260,9 @@ public class ApplicationMaster {
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
// Timeline Client
private TimelineClient timelineClient;
private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c";
@ -261,7 +282,8 @@ public class ApplicationMaster {
result = appMaster.finish();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
System.exit(1);
LogManager.shutdown();
ExitUtil.terminate(1, t);
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
@ -316,7 +338,6 @@ public class ApplicationMaster {
* @throws IOException
*/
public boolean init(String[] args) throws ParseException, IOException {
Options opts = new Options();
opts.addOption("app_attempt_id", true,
"App Attempt ID. Not to be used unless for testing purposes");
@ -464,6 +485,11 @@ public class ApplicationMaster {
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0"));
// Creating the Timeline Client
timelineClient = TimelineClient.createTimelineClient();
timelineClient.init(conf);
timelineClient.start();
return true;
}
@ -485,6 +511,13 @@ public class ApplicationMaster {
@SuppressWarnings({ "unchecked" })
public void run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");
try {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_START);
} catch (Exception e) {
LOG.error("App Attempt start event coud not be pulished for "
+ appAttemptID.toString(), e);
}
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
@ -564,6 +597,13 @@ public class ApplicationMaster {
amRMClient.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainersToRequest);
try {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
DSEvent.DS_APP_ATTEMPT_END);
} catch (Exception e) {
LOG.error("App Attempt start event coud not be pulished for "
+ appAttemptID.toString(), e);
}
}
@VisibleForTesting
@ -668,6 +708,12 @@ public class ApplicationMaster {
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
}
try {
publishContainerEndEvent(timelineClient, containerStatus);
} catch (Exception e) {
LOG.error("Container start event could not be pulished for "
+ containerStatus.getContainerId().toString(), e);
}
}
// ask for more containers if any failed
@ -782,6 +828,13 @@ public class ApplicationMaster {
if (container != null) {
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
}
try {
ApplicationMaster.publishContainerStartEvent(
applicationMaster.timelineClient, container);
} catch (Exception e) {
LOG.error("Container start event coud not be pulished for "
+ container.getId().toString(), e);
}
}
@Override
@ -968,4 +1021,54 @@ public class ApplicationMaster {
org.apache.commons.io.IOUtils.closeQuietly(ds);
}
}
private static void publishContainerStartEvent(TimelineClient timelineClient,
Container container) throws IOException, YarnException {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
.toString());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
event.addEventInfo("Node", container.getNodeId().toString());
event.addEventInfo("Resources", container.getResource().toString());
entity.addEvent(event);
timelineClient.putEntities(entity);
}
private static void publishContainerEndEvent(TimelineClient timelineClient,
ContainerStatus container) throws IOException, YarnException {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId(container.getContainerId().toString());
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
.toString());
TimelineEvent event = new TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
event.addEventInfo("State", container.getState().name());
event.addEventInfo("Exit Status", container.getExitStatus());
entity.addEvent(event);
timelineClient.putEntities(entity);
}
private static void publishApplicationAttemptEvent(
TimelineClient timelineClient, String appAttemptId, DSEvent appEvent)
throws IOException, YarnException {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId(appAttemptId);
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
.toString());
TimelineEvent event = new TimelineEvent();
event.setEventType(appEvent.toString());
event.setTimestamp(System.currentTimeMillis());
entity.addEvent(event);
timelineClient.putEntities(entity);
}
}

View File

@ -36,11 +36,14 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
@ -68,6 +71,7 @@ public class TestDistributedShell {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
FifoScheduler.class, ResourceScheduler.class);
conf.set("yarn.log.dir", "target");
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(
TestDistributedShell.class.getSimpleName(), 1, 1, 1);
@ -92,6 +96,12 @@ public class TestDistributedShell {
os.write(bytesOut.toByteArray());
os.close();
}
FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext
.delete(
new Path(conf
.get("yarn.timeline-service.leveldb-timeline-store.path")),
true);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
@ -108,6 +118,12 @@ public class TestDistributedShell {
yarnCluster = null;
}
}
FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext
.delete(
new Path(conf
.get("yarn.timeline-service.leveldb-timeline-store.path")),
true);
}
@Test(timeout=90000)
@ -171,7 +187,27 @@ public class TestDistributedShell {
t.join();
LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(result.get());
TimelineEntities entitiesAttempts = yarnCluster
.getApplicationHistoryServer()
.getTimelineStore()
.getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
null, null, null, null, null, null);
Assert.assertNotNull(entitiesAttempts);
Assert.assertEquals(1, entitiesAttempts.getEntities().size());
Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
.size());
Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType()
.toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
TimelineEntities entities = yarnCluster
.getApplicationHistoryServer()
.getTimelineStore()
.getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
null, null, null, null, null);
Assert.assertNotNull(entities);
Assert.assertEquals(2, entities.getEntities().size());
Assert.assertEquals(entities.getEntities().get(0).getEntityType()
.toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
}
@Test(timeout=90000)

View File

@ -169,5 +169,12 @@ public class ApplicationHistoryServer extends CompositeService {
throw new YarnRuntimeException(msg, e);
}
}
/**
* @return ApplicationTimelineStore
*/
@Private
@VisibleForTesting
public TimelineStore getTimelineStore() {
return timelineStore;
}
}

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
@ -100,6 +101,9 @@ public class MiniYARNCluster extends CompositeService {
private ResourceManager[] resourceManagers;
private String[] rmIds;
private ApplicationHistoryServer appHistoryServer;
private ApplicationHistoryServerWrapper appHistoryServerWrapper;
private boolean useFixedPorts;
private boolean useRpc = false;
private int failoverTimeout;
@ -241,6 +245,8 @@ public class MiniYARNCluster extends CompositeService {
addService(new NodeManagerWrapper(index));
}
addService(new ApplicationHistoryServerWrapper());
super.serviceInit(
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
}
@ -649,4 +655,67 @@ public class MiniYARNCluster extends CompositeService {
}
return false;
}
private class ApplicationHistoryServerWrapper extends AbstractService {
public ApplicationHistoryServerWrapper() {
super(ApplicationHistoryServerWrapper.class.getName());
}
@Override
protected synchronized void serviceInit(Configuration conf)
throws Exception {
if (!conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS);
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
}
appHistoryServer = new ApplicationHistoryServer();
appHistoryServer.init(conf);
super.serviceInit(conf);
}
@Override
protected synchronized void serviceStart() throws Exception {
try {
new Thread() {
public void run() {
appHistoryServer.start();
};
}.start();
int waitCount = 0;
while (appHistoryServer.getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for Timeline Server to start...");
Thread.sleep(1500);
}
if (appHistoryServer.getServiceState() != STATE.STARTED) {
// AHS could have failed.
throw new IOException(
"ApplicationHistoryServer failed to start. Final state is "
+ appHistoryServer.getServiceState());
}
super.serviceStart();
} catch (Throwable t) {
throw new YarnRuntimeException(t);
}
LOG.info("MiniYARN ApplicationHistoryServer address: "
+ getConfig().get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS));
LOG.info("MiniYARN ApplicationHistoryServer web address: "
+ getConfig().get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS));
}
@Override
protected synchronized void serviceStop() throws Exception {
if (appHistoryServer != null) {
appHistoryServer.stop();
}
super.serviceStop();
}
}
public ApplicationHistoryServer getApplicationHistoryServer() {
return this.appHistoryServer;
}
}