MAPREDUCE-2989. Modified JobHistory to link to task and AM logs from the JobHistoryServer. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-28 06:45:04 +00:00
parent 47c28ce14d
commit 670fa24b48
62 changed files with 1700 additions and 385 deletions

View File

@ -441,6 +441,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2736. Remove unused contrib components dependent on MR1. (eli)
MAPREDUCE-2989. Modified JobHistory to link to task and AM logs from the
JobHistoryServer. (Siddharth Seth via vinodkv)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -22,7 +22,6 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
@ -85,7 +84,6 @@ import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanerImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -142,6 +140,7 @@ public class MRAppMaster extends CompositeService {
private final ApplicationAttemptId appAttemptID;
private final ContainerId containerID;
private final String nmHost;
private final int nmPort;
private final int nmHttpPort;
protected final MRAppMetrics metrics;
private Set<TaskId> completedTasksFromPreviousRun;
@ -168,14 +167,15 @@ public class MRAppMaster extends CompositeService {
private UserGroupInformation currentUser; // Will be setup during init
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmHttpPort, long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmHttpPort,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime);
}
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmHttpPort, Clock clock,
long appSubmitTime) {
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime) {
super(MRAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
@ -183,6 +183,7 @@ public class MRAppMaster extends CompositeService {
this.appAttemptID = applicationAttemptId;
this.containerID = containerId;
this.nmHost = nmHost;
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
@ -757,7 +758,8 @@ public class MRAppMaster extends CompositeService {
amInfos = new LinkedList<AMInfo>();
}
AMInfo amInfo =
new AMInfo(appAttemptID, startTime, containerID, nmHost, nmHttpPort);
new AMInfo(appAttemptID, startTime, containerID, nmHost, nmPort,
nmHttpPort);
amInfos.add(amInfo);
// /////////////////// Create the job itself.
@ -770,7 +772,8 @@ public class MRAppMaster extends CompositeService {
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
.getAppAttemptId(), info.getStartTime(), info.getContainerId(),
info.getNodeManagerHost(), info.getNodeManagerHttpPort())));
info.getNodeManagerHost(), info.getNodeManagerPort(), info
.getNodeManagerHttpPort())));
}
// metrics system init is really init & start.
@ -872,41 +875,44 @@ public class MRAppMaster extends CompositeService {
}
}
private static void validateInputParam(String value, String param)
throws IOException {
if (value == null) {
String msg = param + " is null";
LOG.error(msg);
throw new IOException(msg);
}
}
public static void main(String[] args) {
try {
String containerIdStr =
System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
String nodeHttpAddressStr =
System.getenv(ApplicationConstants.NM_HTTP_ADDRESS_ENV);
String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
String nodeHttpPortString =
System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
String appSubmitTimeStr =
System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
if (containerIdStr == null) {
String msg = ApplicationConstants.AM_CONTAINER_ID_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
if (nodeHttpAddressStr == null) {
String msg = ApplicationConstants.NM_HTTP_ADDRESS_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
if (appSubmitTimeStr == null) {
String msg = ApplicationConstants.APP_SUBMIT_TIME_ENV + " is null";
LOG.error(msg);
throw new IOException(msg);
}
validateInputParam(containerIdStr,
ApplicationConstants.AM_CONTAINER_ID_ENV);
validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
validateInputParam(nodeHttpPortString,
ApplicationConstants.NM_HTTP_PORT_ENV);
validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
containerId.getApplicationAttemptId();
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(nodeHttpAddressStr);
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
MRAppMaster appMaster =
new MRAppMaster(applicationAttemptId, containerId,
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
appSubmitTime);
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
/*
* Recovers the completed tasks from the previous life of Application Master.
@ -393,9 +394,8 @@ public class RecoveryService extends CompositeService implements Recovery {
TaskAttemptInfo attemptInfo) {
LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = attemptInfo.getContainerId();
String[] splits = attemptInfo.getHostname().split(":");
NodeId nodeId = BuilderUtils.newNodeId(splits[0], Integer
.parseInt(splits[1]));
NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname());
// Resource/Priority/ApplicationACLs are only needed while launching the
// container on an NM, these are already completed tasks, so setting them
// to null

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.mapreduce.v2.app.webapp;
import java.util.List;
import com.google.inject.Inject;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
@ -47,6 +50,10 @@ public class NavBlock extends HtmlBlock {
li().a(url("app"), "Jobs")._()._();
if (app.getJob() != null) {
String jobid = MRApps.toString(app.getJob().getID());
List<AMInfo> amInfos = app.getJob().getAMInfos();
AMInfo thisAmInfo = amInfos.get(amInfos.size()-1);
String nodeHttpAddress = thisAmInfo.getNodeManagerHost() + ":"
+ thisAmInfo.getNodeManagerHttpPort();
nav.
h3("Job").
ul().
@ -54,7 +61,11 @@ public class NavBlock extends HtmlBlock {
li().a(url("jobcounters", jobid), "Counters")._().
li().a(url("conf", jobid), "Configuration")._().
li().a(url("tasks", jobid, "m"), "Map tasks")._().
li().a(url("tasks", jobid, "r"), "Reduce tasks")._()._();
li().a(url("tasks", jobid, "r"), "Reduce tasks")._().
li().a(".logslink", url("http://", nodeHttpAddress, "node",
"containerlogs", thisAmInfo.getContainerId().toString(),
app.getJob().getUserName()),
"AM Logs")._()._();
if (app.getTask() != null) {
String taskid = MRApps.toString(app.getTask().getID());
nav.

View File

@ -86,7 +86,7 @@ public class TaskPage extends AppView {
String containerIdStr = ConverterUtils.toString(containerId);
nodeTd._(" ").
a(".logslink", url("http://", nodeHttpAddr, "node", "containerlogs",
containerIdStr), "logs");
containerIdStr, app.getJob().getUserName()), "logs");
}
nodeTd._().
td(".ts", Times.format(startTime)).

View File

@ -136,7 +136,7 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) {
super(getApplicationAttemptId(applicationId, startCount), getContainerId(
applicationId, startCount), "testhost", 3333, System.currentTimeMillis());
applicationId, startCount), "testhost", 2222, 3333, System.currentTimeMillis());
this.testWorkDir = new File("target", testName);
testAbsPath = new Path(testWorkDir.getAbsolutePath());
LOG.info("PathUsed: " + testAbsPath);

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
public class MockJobs extends MockApps {
@ -492,8 +494,21 @@ public class MockJobs extends MockApps {
@Override
public List<AMInfo> getAMInfos() {
throw new UnsupportedOperationException("Not supported yet.");
List<AMInfo> amInfoList = new LinkedList<AMInfo>();
amInfoList.add(createAMInfo(1));
amInfoList.add(createAMInfo(2));
return amInfoList;
}
};
}
private static AMInfo createAMInfo(int attempt) {
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(100, 1), attempt);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
return new AMInfo(appAttemptId, System.currentTimeMillis(), containerId,
"testhost", 2222, 3333);
}
}

View File

@ -44,7 +44,7 @@ public class TestMRAppMaster {
.toApplicationAttemptId(applicationAttemptIdStr);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMasterTest appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1,
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis());
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
@ -60,8 +60,9 @@ class MRAppMasterTest extends MRAppMaster {
private Configuration conf;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, long submitTime) {
super(applicationAttemptId, containerId, host, port, submitTime);
ContainerId containerId, String host, int port, int httpPort,
long submitTime) {
super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
}
@Override

View File

@ -222,6 +222,7 @@ public class TestRecovery {
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
.getApplicationAttemptId());
Assert.assertEquals("testhost", amInfo.getNodeManagerHost());
Assert.assertEquals(2222, amInfo.getNodeManagerPort());
Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
}
long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();

View File

@ -83,7 +83,7 @@ import org.junit.Test;
public TestMRApp(ApplicationAttemptId applicationAttemptId) {
super(applicationAttemptId, BuilderUtils.newContainerId(
applicationAttemptId, 1), "testhost", 3333, System
applicationAttemptId, 1), "testhost", 2222, 3333, System
.currentTimeMillis());
}

View File

@ -75,6 +75,7 @@
{"name": "startTime", "type": "long"},
{"name": "containerId", "type": "string"},
{"name": "nodeManagerHost", "type": "string"},
{"name": "nodeManagerPort", "type": "int"},
{"name": "nodeManagerHttpPort", "type": "int"}
]
},

View File

@ -354,7 +354,7 @@ class JobSubmitter {
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);

View File

@ -46,15 +46,19 @@ public class AMStartedEvent implements HistoryEvent {
* the containerId of the AM.
* @param nodeManagerHost
* the node on which the AM is running.
* @param nodeManagerPort
* the port on which the AM is running.
* @param nodeManagerHttpPort
* the httpPort for the node running the AM.
*/
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerHttpPort) {
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
int nodeManagerHttpPort) {
datum.applicationAttemptId = new Utf8(appAttemptId.toString());
datum.startTime = startTime;
datum.containerId = new Utf8(containerId.toString());
datum.nodeManagerHost = new Utf8(nodeManagerHost);
datum.nodeManagerPort = nodeManagerPort;
datum.nodeManagerHttpPort = nodeManagerHttpPort;
}
@ -98,6 +102,13 @@ public class AMStartedEvent implements HistoryEvent {
return datum.nodeManagerHost.toString();
}
/**
* @return the node manager port.
*/
public int getNodeManagerPort() {
return datum.nodeManagerPort;
}
/**
* @return the http port for the tracker.
*/

View File

@ -320,6 +320,7 @@ public class JobHistoryParser {
amInfo.startTime = event.getStartTime();
amInfo.containerId = event.getContainerId();
amInfo.nodeManagerHost = event.getNodeManagerHost();
amInfo.nodeManagerPort = event.getNodeManagerPort();
amInfo.nodeManagerHttpPort = event.getNodeManagerHttpPort();
if (info.amInfos == null) {
info.amInfos = new LinkedList<AMInfo>();
@ -613,6 +614,7 @@ public class JobHistoryParser {
long startTime;
ContainerId containerId;
String nodeManagerHost;
int nodeManagerPort;
int nodeManagerHttpPort;
/**
@ -626,11 +628,13 @@ public class JobHistoryParser {
}
public AMInfo(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerHttpPort) {
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
int nodeManagerHttpPort) {
this.appAttemptId = appAttemptId;
this.startTime = startTime;
this.containerId = containerId;
this.nodeManagerHost = nodeManagerHost;
this.nodeManagerPort = nodeManagerPort;
this.nodeManagerHttpPort = nodeManagerHttpPort;
}
@ -642,6 +646,7 @@ public class JobHistoryParser {
System.out.println("START_TIME: " + startTime);
System.out.println("CONTAINER_ID: " + containerId.toString());
System.out.println("NODE_MANAGER_HOST: " + nodeManagerHost);
System.out.println("NODE_MANAGER_PORT: " + nodeManagerPort);
System.out.println("NODE_MANAGER_HTTP_PORT: " + nodeManagerHttpPort);
}
@ -665,6 +670,11 @@ public class JobHistoryParser {
return nodeManagerHost;
}
/** @return the port for the node manager running the AM */
public int getNodeManagerPort() {
return nodeManagerPort;
}
/** @return the http port for the node manager running the AM */
public int getNodeManagerHttpPort() {
return nodeManagerHttpPort;

View File

@ -66,12 +66,9 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.mapreduce.v2.hs.webapp.HsWebApp;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.security.client.ClientHSSecurityInfo;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -144,7 +141,7 @@ public class HistoryClientService extends AbstractService {
webApp = new HsWebApp(history);
String bindAddress = conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
WebApps.$for("jobhistory", this).at(bindAddress).start(webApp);
WebApps.$for("jobhistory", this).with(conf).at(bindAddress).start(webApp);
}
@Override

View File

@ -42,11 +42,6 @@ public class JobHistoryServer extends CompositeService {
private HistoryClientService clientService;
private JobHistory jobHistoryService;
static{
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
}
public JobHistoryServer() {
super(JobHistoryServer.class.getName());
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.app.webapp.AppController;
import org.apache.hadoop.yarn.server.nodemanager.webapp.AggregatedLogsPage;
import org.apache.hadoop.yarn.webapp.View;
import com.google.inject.Inject;
@ -32,6 +33,7 @@ import com.google.inject.Inject;
*/
public class HsController extends AppController {
@Inject HsController(App app, Configuration conf, RequestContext ctx) {
super(app, conf, ctx, "History");
}
@ -169,6 +171,20 @@ public class HsController extends AppController {
render(aboutPage());
}
/**
* Render the logs page.
*/
public void logs() {
render(HsLogsPage.class);
}
/**
* Render the nm logs page.
*/
public void nmlogs() {
render(AggregatedLogsPage.class);
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.webapp.AppController#singleCounterPage()

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -37,8 +38,13 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.ResponseInfo;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.apache.hadoop.yarn.webapp.view.InfoBlock;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
@ -86,7 +92,7 @@ public class HsJobBlock extends HtmlBlock {
return;
}
Map<JobACL, AccessControlList> acls = job.getJobACLs();
List<AMInfo> amInfos = job.getAMInfos();
JobReport jobReport = job.getReport();
int mapTasks = job.getTotalMaps();
int mapTasksComplete = job.getCompletedMaps();
@ -105,6 +111,9 @@ public class HsJobBlock extends HtmlBlock {
_("Elapsed:", StringUtils.formatTime(
Times.elapsed(startTime, finishTime, false)));
String amString =
amInfos.size() == 1 ? "ApplicationMaster" : "ApplicationMasters";
List<String> diagnostics = job.getDiagnostics();
if(diagnostics != null && !diagnostics.isEmpty()) {
StringBuffer b = new StringBuffer();
@ -127,10 +136,44 @@ public class HsJobBlock extends HtmlBlock {
infoBlock._("ACL "+entry.getKey().getAclName()+":",
entry.getValue().getAclString());
}
html.
DIV<Hamlet> div = html.
_(InfoBlock.class).
div(_INFO_WRAP).
div(_INFO_WRAP);
// MRAppMasters Table
TABLE<DIV<Hamlet>> table = div.table("#job");
table.
tr().
th(amString).
_().
tr().
th(_TH, "Attempt Number").
th(_TH, "Start Time").
th(_TH, "Node").
th(_TH, "Logs").
_();
for (AMInfo amInfo : amInfos) {
String nodeHttpAddress = amInfo.getNodeManagerHost() +
":" + amInfo.getNodeManagerHttpPort();
NodeId nodeId = BuilderUtils.newNodeId(
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort());
table.tr().
td(String.valueOf(amInfo.getAppAttemptId().getAttemptId())).
td(new Date(amInfo.getStartTime()).toString()).
td().a(".nodelink", url("http://", nodeHttpAddress),
nodeHttpAddress)._().
td().a(".logslink", url("logs", nodeId.toString(),
amInfo.getContainerId().toString(), jid, job.getUserName()),
"logs")._().
_();
}
table._();
div._();
html.div(_INFO_WRAP).
// Tasks table
table("#job").
tr().

View File

@ -0,0 +1,33 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
import org.apache.hadoop.yarn.server.nodemanager.webapp.AggregatedLogsBlock;
import org.apache.hadoop.yarn.webapp.SubView;
public class HsLogsPage extends HsView {
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.hs.webapp.HsView#preHead(org.apache.hadoop.yarn.webapp.hamlet.Hamlet.HTML)
*/
@Override protected void preHead(Page.HTML<_> html) {
String logEntity = $(ENTITY_STRING);
if (logEntity == null || logEntity.isEmpty()) {
logEntity = $(CONTAINER_ID);
}
if (logEntity == null || logEntity.isEmpty()) {
logEntity = "UNKNOWN";
}
commonPreHead(html);
}
/**
* The content of this page is the JobBlock
* @return HsJobBlock.class
*/
@Override protected Class<? extends SubView> content() {
return AggregatedLogsBlock.class;
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TFOOT;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
@ -111,6 +112,8 @@ public class HsTaskPage extends HsView {
String taid = MRApps.toString(ta.getID());
String nodeHttpAddr = ta.getNodeHttpAddress();
String containerIdString = ta.getAssignedContainerID().toString();
String nodeIdString = ta.getAssignedContainerMgrAddress();
long attemptStartTime = ta.getLaunchTime();
long shuffleFinishTime = -1;
@ -134,12 +137,16 @@ public class HsTaskPage extends HsView {
int sortId = ta.getID().getId() + (ta.getID().getTaskId().getId() * 10000);
TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
row.
td().
br().$title(String.valueOf(sortId))._(). // sorting
_(taid)._().
td(ta.getState().toString()).
td().a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr)._();
TD<TR<TBODY<TABLE<Hamlet>>>> td = row.td();
td.br().$title(String.valueOf(sortId))._()
. // sorting
_(taid)._().td(ta.getState().toString()).td()
.a(".nodelink", url("http://", nodeHttpAddr), nodeHttpAddr);
td._(" ").a(".logslink",
url("logs", nodeIdString, containerIdString, taid, app.getJob()
.getUserName()), "logs");
td._();
row.td().
br().$title(String.valueOf(attemptStartTime))._().

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.NM_NODENAME;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.APP_OWNER;
import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -51,6 +55,10 @@ public class HsWebApp extends WebApp implements AMParams {
route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME),
HsController.class, "singleTaskCounter");
route("/about", HsController.class, "about");
route(pajoin("/logs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
HsController.class, "logs");
route(pajoin("/nmlogs", NM_NODENAME, CONTAINER_ID, ENTITY_STRING, APP_OWNER),
HsController.class, "nmlogs");
}
}

View File

@ -115,6 +115,7 @@ public class TestJobHistoryParsing {
Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0)
.getNodeManagerHost());
AMInfo amInfo = jobInfo.getAMInfos().get(0);
Assert.assertEquals(2222, amInfo.getNodeManagerPort());
Assert.assertEquals(3333, amInfo.getNodeManagerHttpPort());
Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()

View File

@ -22,8 +22,14 @@ import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.APP_ID;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.ATTEMPT_STATE;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.NM_NODENAME;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.APP_OWNER;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
@ -38,9 +44,12 @@ import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.webapp.AggregatedLogsPage;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
import org.junit.Test;
import static org.mockito.Mockito.verify;
import com.google.inject.Injector;
public class TestHSWebApp {
@ -188,4 +197,40 @@ public class TestHSWebApp {
WebAppTests.testPage(HsSingleCounterPage.class, AppContext.class,
new TestAppContext());
}
@Test
public void testLogsView1() throws IOException {
LOG.info("HsLogsPage");
Injector injector =
WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class,
new TestAppContext());
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
verify(spyPw).write("Cannot get container logs without a ContainerId");
verify(spyPw).write("Cannot get container logs without a NodeId");
verify(spyPw).write("Cannot get container logs without an app owner");
}
@Test
public void testLogsView2() throws IOException {
LOG.info("HsLogsPage with data");
TestAppContext ctx = new TestAppContext();
Map<String, String> params = new HashMap<String, String>();
params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
.toString());
params.put(NM_NODENAME, BuilderUtils.newNodeId("testhost", 2222).toString());
params.put(ENTITY_STRING, "container_10_0001_01_000001");
params.put(APP_OWNER, "owner");
Injector injector =
WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx,
params);
PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
verify(spyPw).write(
"Logs not available for container_10_0001_01_000001. Aggregation "
+ "may not be complete,"
+ " Check back later or try the nodemanager on testhost:2222");
}
}

View File

@ -119,6 +119,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
LOG.info("Waiting for HistoryServer to start...");
Thread.sleep(1500);
}
//TODO Add a timeout. State.STOPPED check ?
if (historyServer.getServiceState() != STATE.STARTED) {
throw new IOException("HistoryServer failed to start");
}

View File

@ -84,6 +84,11 @@
<Method name="handle" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>
<Class name="~org\.apache\.hadoop\.yarn\.server\.nodemanager\.containermanager\.logaggregation\.LogAggregationService.*" />
<Method name="handle" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!-- Ignore intentional switch fallthroughs -->
<Match>

View File

@ -42,12 +42,24 @@ public interface ApplicationConstants {
* only
*/
public static final String AM_CONTAINER_ID_ENV = "AM_CONTAINER_ID";
/**
* The environment variable for NM_HTTP_ADDRESS. Set in AppMaster environment
* The environment variable for the NM_HOST. Set in the AppMaster environment
* only
*/
public static final String NM_HTTP_ADDRESS_ENV = "NM_HTTP_ADDRESS";
public static final String NM_HOST_ENV = "NM_HOST";
/**
* The environment variable for the NM_PORT. Set in the AppMaster environment
* only
*/
public static final String NM_PORT_ENV = "NM_PORT";
/**
* The environment variable for the NM_HTTP_PORT. Set in the AppMaster environment
* only
*/
public static final String NM_HTTP_PORT_ENV = "NM_HTTP_PORT";
/**
* The environment variable for APP_SUBMIT_TIME. Set in AppMaster environment

View File

@ -137,6 +137,9 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "admin.acl";
public static final String DEFAULT_YARN_ADMIN_ACL = "*";
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
/** The address of the RM admin interface.*/
public static final String RM_ADMIN_ADDRESS =
RM_PREFIX + "admin.address";
@ -290,10 +293,25 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
/** Whether to enable log aggregation */
public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX
+ "log-aggregation.enable";
/** Where to aggregate logs to.*/
public static final String NM_REMOTE_APP_LOG_DIR =
NM_PREFIX + "remote-app-log-dir";
public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs";
/**
* The remote log dir will be created at
* NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId}
*/
public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX =
NM_PREFIX + "remote-app-log-dir-suffix";
public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs";
public static final String YARN_LOG_SERVER_URL =
YARN_PREFIX + "log.server.url";
/** Amount of memory in GB that can be allocated for containers.*/
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -129,6 +130,21 @@ public class ConverterUtils {
return cId.toString();
}
public static NodeId toNodeId(String nodeIdStr) {
String[] parts = nodeIdStr.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid NodeId [" + nodeIdStr
+ "]. Expected host:port");
}
try {
NodeId nodeId =
BuilderUtils.newNodeId(parts[0], Integer.parseInt(parts[1]));
return nodeId;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid port: " + parts[1], e);
}
}
public static ContainerId toContainerId(String containerIdStr) {
Iterator<String> it = _split(containerIdStr).iterator();
if (!it.next().equals(CONTAINER_PREFIX)) {

View File

@ -28,4 +28,4 @@ public interface Params {
static final String TITLE_LINK = "title.href";
static final String USER = "user";
static final String ERROR_DETAILS = "error.details";
}
}

View File

@ -34,7 +34,9 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpServletRequest;
@ -169,4 +171,16 @@ public class WebAppTests {
public static <T> Injector testBlock(Class<? extends SubView> block) {
return testBlock(block, null, null);
}
/**
* Convenience method to get the spy writer.
* @param injector the injector used for the test.
* @return The Spy writer.
* @throws IOException
*/
public static PrintWriter getPrintWriter(Injector injector)
throws IOException {
HttpServletResponse res = injector.getInstance(HttpServletResponse.class);
return res.getWriter();
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AdminACLsManager;
@InterfaceAudience.Private
@ -47,7 +48,7 @@ public class ApplicationACLsManager {
public ApplicationACLsManager(Configuration conf) {
this.conf = conf;
this.adminAclsManager = new AdminACLsManager(conf);
this.adminAclsManager = new AdminACLsManager(this.conf);
}
public boolean areACLsEnabled() {
@ -102,6 +103,16 @@ public class ApplicationACLsManager {
AccessControlList applicationACL = this.applicationACLS
.get(applicationId).get(applicationAccessType);
if (applicationACL == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("ACL not found for access-type " + applicationAccessType
+ " for application " + applicationId + " owned by "
+ applicationOwner + ". Using default ["
+ YarnConfiguration.DEFAULT_YARN_APP_ACL + "]");
}
applicationACL =
new AccessControlList(YarnConfiguration.DEFAULT_YARN_APP_ACL);
}
// Allow application-owner for any type of access on the application
if (this.adminAclsManager.isAdmin(callerUGI)

View File

@ -279,11 +279,24 @@
<value>/tmp/logs</value>
</property>
<property>
<description>Whether to enable log aggregation</description>
<name>yarn.nodemanager.log-aggregation.enable</name>
<value>false</value>
</property>
<property>
<description>Where to aggregate logs to.</description>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/tmp/logs</value>
</property>
<property>
<description>The remote log dir will be created at
{yarn.nodemanager.remote-app-log-dir}/${user}/{thisParam}
</description>
<name>yarn.nodemanager.remote-app-log-dir-suffix</name>
<value>logs</value>
</property>
<property>
<description>Amount of physical memory, in MB, that can be allocated

View File

@ -20,5 +20,5 @@ package org.apache.hadoop.yarn.server.nodemanager;
public enum ContainerManagerEventType {
FINISH_APPS,
FINISH_CONTAINERS
FINISH_CONTAINERS,
}

View File

@ -266,6 +266,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new CMgrCompletedAppsEvent(appsToCleanup));
}
} catch (Throwable e) {
// TODO Better error handling. Thread can die with the rest of the
// NM still running.
LOG.error("Caught exception in status-updater", e);
break;
}

View File

@ -168,7 +168,7 @@ public class ContainerManagerImpl extends CompositeService implements
protected LogAggregationService createLogAggregationService(Context context,
DeletionService deletionService) {
return new LogAggregationService(context, deletionService);
return new LogAggregationService(this.dispatcher, context, deletionService);
}
public ContainersMonitor getContainersMonitor() {
@ -289,8 +289,9 @@ public class ContainerManagerImpl extends CompositeService implements
}
// Create the application
Application application = new ApplicationImpl(dispatcher,
this.aclsManager, launchContext.getUser(), applicationID, credentials);
Application application =
new ApplicationImpl(dispatcher, this.aclsManager,
launchContext.getUser(), applicationID, credentials, context);
if (null ==
context.getApplications().putIfAbsent(applicationID, application)) {
LOG.info("Creating a new application reference for app "
@ -319,6 +320,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
@Override
@SuppressWarnings("unchecked")
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
@ -398,20 +400,20 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public void handle(ApplicationEvent event) {
Application app =
ContainerManagerImpl.this.context.getApplications().get(
event.getApplicationID());
Application app =
ContainerManagerImpl.this.context.getApplications().get(
event.getApplicationID());
if (app != null) {
app.handle(event);
} else {
LOG.warn("Event " + event + " sent to absent application " +
event.getApplicationID());
LOG.warn("Event " + event + " sent to absent application "
+ event.getApplicationID());
}
}
}
@Override
@SuppressWarnings("unchecked")
public void handle(ContainerManagerEvent event) {
switch (event.getType()) {
case FINISH_APPS:

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.Event;
public class ApplicationEvent extends AbstractEvent<ApplicationEventType> {

View File

@ -32,6 +32,6 @@ public enum ApplicationEventType {
// Source: Container
APPLICATION_CONTAINER_FINISHED,
// Source: LogAggregationService.
APPLICATION_FINISHED,
// Source: Log Aggregation
APPLICATION_LOG_AGGREGATION_FINISHED
}

View File

@ -29,8 +29,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -48,7 +50,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* The state machine for the representation of an Application
@ -63,6 +64,7 @@ public class ApplicationImpl implements Application {
final ApplicationACLsManager aclsManager;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Context context;
private static final Log LOG = LogFactory.getLog(Application.class);
@ -71,12 +73,13 @@ public class ApplicationImpl implements Application {
public ApplicationImpl(Dispatcher dispatcher,
ApplicationACLsManager aclsManager, String user, ApplicationId appId,
Credentials credentials) {
Credentials credentials, Context context) {
this.dispatcher = dispatcher;
this.user = user.toString();
this.appId = appId;
this.credentials = credentials;
this.aclsManager = aclsManager;
this.context = context;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@ -173,7 +176,13 @@ public class ApplicationImpl implements Application {
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition())
// Transitions from FINISHED state
.addTransition(ApplicationState.FINISHED,
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
new AppLogsAggregatedTransition())
// create the topology tables
.installTopology();
@ -239,11 +248,15 @@ public class ApplicationImpl implements Application {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
Map<ApplicationAccessType, String> appAcls =
app.getContainers().values().iterator().next().getLaunchContext()
.getApplicationACLs();
// Inform the logAggregator
app.dispatcher.getEventHandler().handle(
new LogAggregatorAppStartedEvent(app.appId, app.user,
app.credentials,
ContainerLogsRetentionPolicy.ALL_CONTAINERS)); // TODO: Fix
new LogAggregatorAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
appAcls));
// Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) {
@ -339,13 +352,20 @@ public class ApplicationImpl implements Application {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
app.aclsManager.removeApplication(app.getAppId());
// Inform the logService
app.dispatcher.getEventHandler().handle(
new LogAggregatorAppFinishedEvent(app.appId));
// TODO: Also make logService write the acls to the aggregated file.
}
}
static class AppLogsAggregatedTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationId appId = event.getApplicationID();
app.context.getApplications().remove(appId);
app.aclsManager.removeApplication(appId);
}
}
@ -377,6 +397,6 @@ public class ApplicationImpl implements Application {
@Override
public String toString() {
return ConverterUtils.toString(appId);
return appId.toString();
}
}

View File

@ -119,11 +119,11 @@ public class ContainerLaunch implements Callable<Integer> {
// /////////////////////////// Variable expansion
// Before the container script gets written out.
List<String> newCmds = new ArrayList<String>(command.size());
String appIdStr = app.toString();
String appIdStr = app.getAppId().toString();
Path containerLogDir =
this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
+ containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf,
false);
this.logDirsSelector.getLocalPathForWrite(ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr),
LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
@ -384,6 +384,11 @@ public class ContainerLaunch implements Callable<Integer> {
return processId;
}
public static String getRelativeContainerLogDir(String appIdStr,
String containerIdStr) {
return appIdStr + Path.SEPARATOR + containerIdStr;
}
private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+ Path.SEPARATOR;

View File

@ -25,10 +25,16 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.Writer;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -41,6 +47,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -48,32 +56,50 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
public class AggregatedLogFormat {
static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
private static final LogKey VERSION_KEY = new LogKey("VERSION");
private static final Map<String, LogKey> RESERVED_KEYS;
//Maybe write out the retention policy.
//Maybe write out a list of containerLogs skipped by the retention policy.
private static final int VERSION = 1;
static {
RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
}
public static class LogKey implements Writable {
private String containerId;
private String keyString;
public LogKey() {
}
public LogKey(ContainerId containerId) {
this.containerId = ConverterUtils.toString(containerId);
this.keyString = containerId.toString();
}
public LogKey(String keyString) {
this.keyString = keyString;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.containerId);
out.writeUTF(this.keyString);
}
@Override
public void readFields(DataInput in) throws IOException {
this.containerId = in.readUTF();
this.keyString = in.readUTF();
}
@Override
public String toString() {
return this.containerId;
return this.keyString;
}
}
@ -81,6 +107,8 @@ public class AggregatedLogFormat {
private final String[] rootLogDirs;
private final ContainerId containerId;
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
public LogValue(String[] rootLogDirs, ContainerId containerId) {
this.rootLogDirs = rootLogDirs;
@ -141,7 +169,8 @@ public class AggregatedLogFormat {
public FSDataOutputStream run() throws Exception {
return FileContext.getFileContext(conf).create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE), new Options.CreateOpts[] {});
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
@ -154,6 +183,40 @@ public class AggregatedLogFormat {
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
//Write the version string
writeVersion();
}
private void writeVersion() throws IOException {
DataOutputStream out = this.writer.prepareAppendKey(-1);
VERSION_KEY.write(out);
out.close();
out = this.writer.prepareAppendValue(-1);
out.writeInt(VERSION);
out.close();
this.fsDataOStream.hflush();
}
public void writeApplicationOwner(String user) throws IOException {
DataOutputStream out = this.writer.prepareAppendKey(-1);
APPLICATION_OWNER_KEY.write(out);
out.close();
out = this.writer.prepareAppendValue(-1);
out.writeUTF(user);
out.close();
}
public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
throws IOException {
DataOutputStream out = this.writer.prepareAppendKey(-1);
APPLICATION_ACL_KEY.write(out);
out.close();
out = this.writer.prepareAppendValue(-1);
for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
out.writeUTF(entry.getKey().toString());
out.writeUTF(entry.getValue());
}
out.close();
}
public void append(LogKey logKey, LogValue logValue) throws IOException {
@ -184,12 +247,13 @@ public class AggregatedLogFormat {
private final FSDataInputStream fsDataIStream;
private final TFile.Reader.Scanner scanner;
private final TFile.Reader reader;
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext = FileContext.getFileContext(conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
TFile.Reader reader =
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
@ -197,6 +261,69 @@ public class AggregatedLogFormat {
private boolean atBeginning = true;
/**
* Returns the owner of the application.
*
* @return the application owner.
* @throws IOException
*/
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = reader.createScanner();
LogKey key = new LogKey();
while (!ownerScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
return valueStream.readUTF();
}
ownerScanner.advance();
}
return null;
}
/**
* Returns ACLs for the application. An empty map is returned if no ACLs are
* found.
*
* @return a map of the Application ACLs.
* @throws IOException
*/
public Map<ApplicationAccessType, String> getApplicationAcls()
throws IOException {
// TODO Seek directly to the key once a comparator is specified.
TFile.Reader.Scanner aclScanner = reader.createScanner();
LogKey key = new LogKey();
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
while (!aclScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = aclScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
while (true) {
String appAccessOp = null;
String aclString = null;
try {
appAccessOp = valueStream.readUTF();
} catch (EOFException e) {
// Valid end of stream.
break;
}
try {
aclString = valueStream.readUTF();
} catch (EOFException e) {
throw new YarnException("Error reading ACLs", e);
}
acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
}
}
aclScanner.advance();
}
return acls;
}
/**
* Read the next key and return the value-stream.
*
@ -215,10 +342,99 @@ public class AggregatedLogFormat {
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
//TODO Change Log format and interfaces to be containerId specific.
// Avoid returning completeValueStreams.
// public List<String> getTypesForContainer(DataInputStream valueStream){}
//
// /**
// * @param valueStream
// * The Log stream for the container.
// * @param fileType
// * the log type required.
// * @return An InputStreamReader for the required log type or null if the
// * type is not found.
// * @throws IOException
// */
// public InputStreamReader getLogStreamForType(DataInputStream valueStream,
// String fileType) throws IOException {
// valueStream.reset();
// try {
// while (true) {
// String ft = valueStream.readUTF();
// String fileLengthStr = valueStream.readUTF();
// long fileLength = Long.parseLong(fileLengthStr);
// if (ft.equals(fileType)) {
// BoundedInputStream bis =
// new BoundedInputStream(valueStream, fileLength);
// return new InputStreamReader(bis);
// } else {
// long totalSkipped = 0;
// long currSkipped = 0;
// while (currSkipped != -1 && totalSkipped < fileLength) {
// currSkipped = valueStream.skip(fileLength - totalSkipped);
// totalSkipped += currSkipped;
// }
// // TODO Verify skip behaviour.
// if (currSkipped == -1) {
// return null;
// }
// }
// }
// } catch (EOFException e) {
// return null;
// }
// }
/**
* Writes all logs for a single container to the provided writer.
* @param valueStream
* @param writer
* @throws IOException
*/
public static void readAcontainerLogs(DataInputStream valueStream,
Writer writer) throws IOException {
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
String fileType;
String fileLengthStr;
long fileLength;
while (true) {
try {
fileType = valueStream.readUTF();
} catch (EOFException e) {
// EndOfFile
return;
}
fileLengthStr = valueStream.readUTF();
fileLength = Long.parseLong(fileLengthStr);
writer.write("\n\nLogType:");
writer.write(fileType);
writer.write("\nLogLength:");
writer.write(fileLengthStr);
writer.write("\nLog Contents:\n");
// ByteLevel
BoundedInputStream bis =
new BoundedInputStream(valueStream, fileLength);
InputStreamReader reader = new InputStreamReader(bis);
int currentRead = 0;
int totalRead = 0;
while ((currentRead = reader.read(cbuf, 0, bufferSize)) != -1) {
writer.write(cbuf);
totalRead += currentRead;
}
}
}
/**
* Keep calling this till you get a {@link EOFException} for getting logs of
* all types for a single container.

View File

@ -18,8 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@ -27,11 +28,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogWriter;
@ -42,7 +48,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Log LOG = LogFactory
.getLog(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
private static final String TMP_FILE_SUFFIX = ".tmp";
private final Dispatcher dispatcher;
private final ApplicationId appId;
private final String applicationId;
private boolean logAggregationDisabled = false;
private final Configuration conf;
@ -50,26 +59,34 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final UserGroupInformation userUgi;
private final String[] rootLogDirs;
private final Path remoteNodeLogFileForApp;
private final Path remoteNodeTmpLogFileForApp;
private final ContainerLogsRetentionPolicy retentionPolicy;
private final BlockingQueue<ContainerId> pendingContainers;
private final AtomicBoolean appFinishing = new AtomicBoolean();
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
private final Map<ApplicationAccessType, String> appAcls;
private LogWriter writer = null;
public AppLogAggregatorImpl(DeletionService deletionService,
Configuration conf, ApplicationId appId, UserGroupInformation userUgi,
String[] localRootLogDirs, Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy) {
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf, ApplicationId appId,
UserGroupInformation userUgi, String[] localRootLogDirs,
Path remoteNodeLogFileForApp,
ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
this.appId = appId;
this.applicationId = ConverterUtils.toString(appId);
this.userUgi = userUgi;
this.rootLogDirs = localRootLogDirs;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
this.retentionPolicy = retentionPolicy;
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
this.appAcls = appAcls;
}
private void uploadLogsForContainer(ContainerId containerId) {
@ -80,11 +97,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
// Lazy creation of the writer
if (this.writer == null) {
LOG.info("Starting aggregate log-file for app " + this.applicationId);
LOG.info("Starting aggregate log-file for app " + this.applicationId
+ " at " + this.remoteNodeTmpLogFileForApp);
try {
this.writer =
new LogWriter(this.conf, this.remoteNodeLogFileForApp,
new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
this.userUgi);
//Write ACLs once when and if the writer is created.
this.writer.writeApplicationACLs(appAcls);
this.writer.writeApplicationOwner(this.userUgi.getShortUserName());
} catch (IOException e) {
LOG.error("Cannot create writer for app " + this.applicationId
+ ". Disabling log-aggregation for this app.", e);
@ -105,8 +126,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
}
@Override
public void run() {
@SuppressWarnings("unchecked")
public void run() {
ContainerId containerId;
while (!this.appFinishing.get()) {
@ -141,10 +162,33 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
this.writer.closeWriter();
LOG.info("Finished aggregate log-file for app " + this.applicationId);
}
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = FileSystem.get(conf);
remoteFS.rename(remoteNodeTmpLogFileForApp, remoteNodeLogFileForApp);
return null;
}
});
} catch (Exception e) {
LOG.error("Failed to move temporary log file to final location: ["
+ remoteNodeTmpLogFileForApp + "] to [" + remoteNodeLogFileForApp
+ "]", e);
}
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(this.appId,
ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED));
this.appAggregationFinished.set(true);
}
private Path getRemoteNodeTmpLogFileForApp() {
return new Path(remoteNodeLogFileForApp.getParent(),
(remoteNodeLogFileForApp.getName() + TMP_FILE_SUFFIX));
}
private boolean shouldUploadLogs(ContainerId containerId,
boolean wasContainerSuccessful) {

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@ -31,20 +31,26 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -54,20 +60,43 @@ public class LogAggregationService extends AbstractService implements
private static final Log LOG = LogFactory
.getLog(LogAggregationService.class);
/*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
* Group to which NMOwner belongs> App dirs will be created as 750,
* owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
* access / modify the files.
* <NMGroup> should obviously be a limited access group.
*/
/**
* Permissions for the top level directory under which app directories will be
* created.
*/
private static final FsPermission TLDIR_PERMISSIONS = FsPermission
.createImmutable((short) 01777);
/**
* Permissions for the Application directory.
*/
private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0750);
private final Context context;
private final DeletionService deletionService;
private final Dispatcher dispatcher;
private String[] localRootLogDirs;
Path remoteRootLogDir;
private String nodeFile;
String remoteRootLogDirSuffix;
private NodeId nodeId;
private boolean isLogAggregationEnabled = false;
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
private final ExecutorService threadPool;
public LogAggregationService(Context context,
public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService) {
super(LogAggregationService.class.getName());
this.dispatcher = dispatcher;
this.context = context;
this.deletionService = deletionService;
this.appLogAggregators =
@ -80,10 +109,17 @@ public class LogAggregationService extends AbstractService implements
public synchronized void init(Configuration conf) {
this.localRootLogDirs =
conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
YarnConfiguration.DEFAULT_NM_LOG_DIRS);
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
this.isLogAggregationEnabled =
conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
super.init(conf);
}
@ -91,26 +127,11 @@ public class LogAggregationService extends AbstractService implements
public synchronized void start() {
// NodeId is only available during start, the following cannot be moved
// anywhere else.
this.nodeFile = this.context.getNodeId().toString();
this.nodeId = this.context.getNodeId();
verifyAndCreateRemoteLogDir(getConfig());
super.start();
}
Path getRemoteNodeLogFileForApp(ApplicationId appId) {
return getRemoteNodeLogFileForApp(this.remoteRootLogDir, appId,
this.nodeFile);
}
static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
ApplicationId appId, String nodeFile) {
return new Path(getRemoteAppLogDir(remoteRootLogDir, appId),
nodeFile);
}
static Path getRemoteAppLogDir(Path remoteRootLogDir,
ApplicationId appId) {
return new Path(remoteRootLogDir, ConverterUtils.toString(appId));
}
@Override
public synchronized void stop() {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
@ -120,8 +141,217 @@ public class LogAggregationService extends AbstractService implements
super.stop();
}
/**
* Constructs the full filename for an application's log file per node.
* @param remoteRootLogDir
* @param appId
* @param user
* @param nodeId
* @param suffix
* @return the remote log file.
*/
public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
ApplicationId appId, String user, NodeId nodeId, String suffix) {
return new Path(getRemoteAppLogDir(remoteRootLogDir, appId, user, suffix),
getNodeString(nodeId));
}
/**
* Gets the remote app log dir.
* @param remoteRootLogDir
* @param appId
* @param user
* @param suffix
* @return the remote application specific log dir.
*/
public static Path getRemoteAppLogDir(Path remoteRootLogDir,
ApplicationId appId, String user, String suffix) {
return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix),
appId.toString());
}
/**
* Gets the remote suffixed log dir for the user.
* @param remoteRootLogDir
* @param user
* @param suffix
* @return the remote suffixed log dir.
*/
private static Path getRemoteLogSuffixedDir(Path remoteRootLogDir,
String user, String suffix) {
if (suffix == null || suffix.isEmpty()) {
return getRemoteLogUserDir(remoteRootLogDir, user);
}
// TODO Maybe support suffix to be more than a single file.
return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix);
}
// TODO Add a utility method to list available log files. Ignore the
// temporary ones.
/**
* Gets the remote log user dir.
* @param remoteRootLogDir
* @param user
* @return the remote per user log dir.
*/
private static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) {
return new Path(remoteRootLogDir, user);
}
/**
* Returns the suffix component of the log dir.
* @param conf
* @return the suffix which will be appended to the user log dir.
*/
public static String getRemoteNodeLogDirSuffix(Configuration conf) {
return conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
}
/**
* Converts a nodeId to a form used in the app log file name.
* @param nodeId
* @return the node string to be used to construct the file name.
*/
private static String getNodeString(NodeId nodeId) {
return nodeId.toString().replace(":", "_");
}
private void verifyAndCreateRemoteLogDir(Configuration conf) {
// Checking the existance of the TLD
FileSystem remoteFS = null;
try {
remoteFS = FileSystem.get(conf);
} catch (IOException e) {
throw new YarnException("Unable to get Remote FileSystem isntance", e);
}
boolean remoteExists = false;
try {
remoteExists = remoteFS.exists(this.remoteRootLogDir);
} catch (IOException e) {
throw new YarnException("Failed to check for existance of remoteLogDir ["
+ this.remoteRootLogDir + "]");
}
if (remoteExists) {
try {
FsPermission perms =
remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS)) {
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users.");
}
} catch (IOException e) {
throw new YarnException(
"Failed while attempting to check permissions for dir ["
+ this.remoteRootLogDir + "]");
}
} else {
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ "] does not exist. Attempting to create it.");
try {
Path qualified =
this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
} catch (IOException e) {
throw new YarnException("Failed to create remoteLogDir ["
+ this.remoteRootLogDir + "]", e);
}
}
}
Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
return LogAggregationService.getRemoteNodeLogFileForApp(
this.remoteRootLogDir, appId, user, this.nodeId,
this.remoteRootLogDirSuffix);
}
private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
fs.mkdirs(path, new FsPermission(fsPerm));
fs.setPermission(path, new FsPermission(fsPerm));
}
private void createAppDir(final String user, final ApplicationId appId,
UserGroupInformation userUgi) {
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// TODO: Reuse FS for user?
FileSystem remoteFS = null;
Path userDir = null;
Path suffixDir = null;
Path appDir = null;
try {
remoteFS = FileSystem.get(getConfig());
} catch (IOException e) {
LOG.error("Failed to get remote FileSystem while processing app "
+ appId, e);
throw e;
}
try {
userDir =
getRemoteLogUserDir(
LogAggregationService.this.remoteRootLogDir, user);
userDir =
userDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
} catch (IOException e) {
LOG.error("Failed to create user dir [" + userDir
+ "] while processing app " + appId);
throw e;
}
try {
suffixDir =
getRemoteLogSuffixedDir(
LogAggregationService.this.remoteRootLogDir, user,
LogAggregationService.this.remoteRootLogDirSuffix);
suffixDir =
suffixDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
} catch (IOException e) {
LOG.error("Failed to create suffixed user dir [" + suffixDir
+ "] while processing app " + appId);
throw e;
}
try {
appDir =
getRemoteAppLogDir(LogAggregationService.this.remoteRootLogDir,
appId, user,
LogAggregationService.this.remoteRootLogDirSuffix);
appDir =
appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
} catch (IOException e) {
LOG.error("Failed to create application log dir [" + appDir
+ "] while processing app " + appId);
throw e;
}
return null;
}
});
} catch (Exception e) {
throw new YarnException(e);
}
}
private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy) {
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
// Get user's FileSystem credentials
UserGroupInformation userUgi =
@ -133,41 +363,27 @@ public class LogAggregationService extends AbstractService implements
}
}
// Create the app dir
createAppDir(user, appId, userUgi);
// New application
AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.deletionService, getConfig(), appId,
userUgi, this.localRootLogDirs,
getRemoteNodeLogFileForApp(appId), logRetentionPolicy);
new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId,
userUgi, this.localRootLogDirs,
getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnException("Duplicate initApp for " + appId);
}
// Create the app dir
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// TODO: Reuse FS for user?
FileSystem remoteFS = FileSystem.get(getConfig());
remoteFS.mkdirs(getRemoteAppLogDir(
LogAggregationService.this.remoteRootLogDir, appId)
.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
return null;
}
});
} catch (Exception e) {
throw new YarnException(e);
}
// Get the user configuration for the list of containers that need log
// TODO Get the user configuration for the list of containers that need log
// aggregation.
// Schedule the aggregator.
this.threadPool.execute(appLogAggregator);
}
private void stopContainer(ContainerId containerId, String exitCode) {
private void stopContainer(ContainerId containerId, int exitCode) {
// A container is complete. Put this containers' logs up for aggregation if
// this containers' logs are needed.
@ -179,7 +395,7 @@ public class LogAggregationService extends AbstractService implements
}
this.appLogAggregators.get(
containerId.getApplicationAttemptId().getApplicationId())
.startContainerLogAggregation(containerId, exitCode.equals("0"));
.startContainerLogAggregation(containerId, exitCode == 0);
}
private void stopApp(ApplicationId appId) {
@ -196,27 +412,30 @@ public class LogAggregationService extends AbstractService implements
@Override
public void handle(LogAggregatorEvent event) {
// switch (event.getType()) {
// case APPLICATION_STARTED:
// LogAggregatorAppStartedEvent appStartEvent =
// (LogAggregatorAppStartedEvent) event;
// initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
// appStartEvent.getCredentials(),
// appStartEvent.getLogRetentionPolicy());
// break;
// case CONTAINER_FINISHED:
// LogAggregatorContainerFinishedEvent containerFinishEvent =
// (LogAggregatorContainerFinishedEvent) event;
// stopContainer(containerFinishEvent.getContainerId(),
// containerFinishEvent.getExitCode());
// break;
// case APPLICATION_FINISHED:
// LogAggregatorAppFinishedEvent appFinishedEvent =
// (LogAggregatorAppFinishedEvent) event;
// stopApp(appFinishedEvent.getApplicationId());
// break;
// default:
// ; // Ignore
// }
if (this.isLogAggregationEnabled) {
switch (event.getType()) {
case APPLICATION_STARTED:
LogAggregatorAppStartedEvent appStartEvent =
(LogAggregatorAppStartedEvent) event;
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getLogRetentionPolicy(),
appStartEvent.getApplicationAcls());
break;
case CONTAINER_FINISHED:
LogAggregatorContainerFinishedEvent containerFinishEvent =
(LogAggregatorContainerFinishedEvent) event;
stopContainer(containerFinishEvent.getContainerId(),
containerFinishEvent.getExitCode());
break;
case APPLICATION_FINISHED:
LogAggregatorAppFinishedEvent appFinishedEvent =
(LogAggregatorAppFinishedEvent) event;
stopApp(appFinishedEvent.getApplicationId());
break;
default:
; // Ignore
}
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -110,7 +111,12 @@ public class LogDumper extends Configured implements Tool {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
LogAggregationService.getRemoteNodeLogFileForApp(
remoteRootLogDir, appId, nodeAddress));
remoteRootLogDir,
appId,
UserGroupInformation.getCurrentUser().getShortUserName(),
ConverterUtils.toNodeId(nodeAddress),
getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)));
return dumpAContainerLogs(containerIdStr, reader, out);
}
@ -152,16 +158,21 @@ public class LogDumper extends Configured implements Tool {
Path remoteRootLogDir =
new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
String logDirSuffix =
getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
//TODO Change this to get a list of files from the LAS.
Path remoteAppLogDir =
LogAggregationService.getRemoteAppLogDir(remoteRootLogDir, appId);
LogAggregationService.getRemoteAppLogDir(remoteRootLogDir, appId, user,
logDirSuffix);
RemoteIterator<FileStatus> nodeFiles =
FileContext.getFileContext().listStatus(remoteAppLogDir);
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(getConf(),
LogAggregationService.getRemoteNodeLogFileForApp(
remoteRootLogDir, appId, thisNodeFile.getPath().getName()));
new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
try {
DataInputStream valueStream;

View File

@ -1,25 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event;
import java.util.Map;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
@ -28,14 +31,17 @@ public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
private final ContainerLogsRetentionPolicy retentionPolicy;
private final String user;
private final Credentials credentials;
private final Map<ApplicationAccessType, String> appAcls;
public LogAggregatorAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy) {
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
super(LogAggregatorEventType.APPLICATION_STARTED);
this.applicationId = appId;
this.user = user;
this.credentials = credentials;
this.retentionPolicy = retentionPolicy;
this.appAcls = appAcls;
}
public ApplicationId getApplicationId() {
@ -54,4 +60,8 @@ public class LogAggregatorAppStartedEvent extends LogAggregatorEvent {
return this.user;
}
public Map<ApplicationAccessType, String> getApplicationAcls() {
return this.appAcls;
}
}

View File

@ -0,0 +1,175 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.NM_NODENAME;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.APP_OWNER;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
public class AggregatedLogsBlock extends HtmlBlock {
private final Configuration conf;
@Inject
AggregatedLogsBlock(Configuration conf) {
this.conf = conf;
}
@Override
protected void render(Block html) {
ContainerId containerId = verifyAndGetContainerId(html);
NodeId nodeId = verifyAndGetNodeId(html);
String appOwner = verifyAndGetAppOwner(html);
if (containerId == null || nodeId == null || appOwner == null
|| appOwner.isEmpty()) {
return;
}
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
String logEntity = $(ENTITY_STRING);
if (logEntity == null || logEntity.isEmpty()) {
logEntity = containerId.toString();
}
Path remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
AggregatedLogFormat.LogReader reader = null;
try {
reader =
new AggregatedLogFormat.LogReader(conf,
LogAggregationService.getRemoteNodeLogFileForApp(
remoteRootLogDir, applicationId, appOwner, nodeId,
LogAggregationService.getRemoteNodeLogDirSuffix(conf)));
} catch (FileNotFoundException e) {
// ACLs not available till the log file is opened.
html.h1()
._("Logs not available for "
+ logEntity
+ ". Aggregation may not be complete, "
+ "Check back later or try the nodemanager on "
+ nodeId)._();
return;
} catch (IOException e) {
html.h1()._("Error getting logs for " + logEntity)._();
LOG.error("Error getting logs for " + logEntity, e);
return;
}
String owner = null;
Map<ApplicationAccessType, String> appAcls = null;
try {
owner = reader.getApplicationOwner();
appAcls = reader.getApplicationAcls();
} catch (IOException e) {
html.h1()._("Error getting logs for " + logEntity)._();
LOG.error("Error getting logs for " + logEntity, e);
return;
}
ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
aclsManager.addApplication(applicationId, appAcls);
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null
&& !aclsManager.checkAccess(callerUGI, ApplicationAccessType.VIEW_APP,
owner, applicationId)) {
html.h1()
._("User [" + remoteUser
+ "] is not authorized to view the logs for " + logEntity)._();
return;
}
DataInputStream valueStream;
LogKey key = new LogKey();
try {
valueStream = reader.next(key);
while (valueStream != null
&& !key.toString().equals(containerId.toString())) {
valueStream = reader.next(key);
}
if (valueStream == null) {
html.h1()._(
"Logs not available for " + logEntity
+ ". Could be caused by the rentention policy")._();
return;
}
writer().write("<pre>");
AggregatedLogFormat.LogReader.readAcontainerLogs(valueStream, writer());
writer().write("</pre>");
return;
} catch (IOException e) {
html.h1()._("Error getting logs for " + logEntity)._();
LOG.error("Error getting logs for " + logEntity, e);
return;
}
}
private ContainerId verifyAndGetContainerId(Block html) {
String containerIdStr = $(CONTAINER_ID);
if (containerIdStr == null || containerIdStr.isEmpty()) {
html.h1()._("Cannot get container logs without a ContainerId")._();
return null;
}
ContainerId containerId = null;
try {
containerId = ConverterUtils.toContainerId(containerIdStr);
} catch (IllegalArgumentException e) {
html.h1()
._("Cannot get container logs for invalid containerId: "
+ containerIdStr)._();
return null;
}
return containerId;
}
private NodeId verifyAndGetNodeId(Block html) {
String nodeIdStr = $(NM_NODENAME);
if (nodeIdStr == null || nodeIdStr.isEmpty()) {
html.h1()._("Cannot get container logs without a NodeId")._();
return null;
}
NodeId nodeId = null;
try {
nodeId = ConverterUtils.toNodeId(nodeIdStr);
} catch (IllegalArgumentException e) {
html.h1()._("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
._();
return null;
}
return nodeId;
}
private String verifyAndGetAppOwner(Block html) {
String appOwner = $(APP_OWNER);
if (appOwner == null || appOwner.isEmpty()) {
html.h1()._("Cannot get container logs without an app owner")._();
}
return appOwner;
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
public class AggregatedLogsNavBlock extends HtmlBlock implements NMWebParams {
@Override
protected void render(Block html) {
html
.div("#nav")
.h3()._("Logs")._() //
._()
.div("#themeswitcher")._();
}
}

View File

@ -0,0 +1,44 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.THEMESWITCHER_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import org.apache.hadoop.yarn.webapp.SubView;
public class AggregatedLogsPage extends NMView {
/* (non-Javadoc)
* @see org.apache.hadoop.yarn.server.nodemanager.webapp.NMView#preHead(org.apache.hadoop.yarn.webapp.hamlet.Hamlet.HTML)
*/
@Override
protected void preHead(Page.HTML<_> html) {
String logEntity = $(ENTITY_STRING);
if (logEntity == null || logEntity.isEmpty()) {
logEntity = $(CONTAINER_ID);
}
if (logEntity == null || logEntity.isEmpty()) {
logEntity = "UNKNOWN";
}
set(TITLE, join("Logs for ", logEntity));
set(ACCORDION_ID, "nav");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
set(THEMESWITCHER_ID, "themeswitcher");
}
@Override
protected Class<? extends SubView> content() {
return AggregatedLogsBlock.class;
}
@Override
protected Class<? extends SubView> nav() {
return AggregatedLogsNavBlock.class;
}
}

View File

@ -91,7 +91,8 @@ public class AllContainersPage extends NMView {
._()
.td()._(container.getContainerState())._()
.td()
.a(url("containerlogs", containerIdStr), "logs")._()
.a(url("containerlogs", containerIdStr, container.getUser()),
"logs")._()
._();
}
tableBody._()._()._();

View File

@ -18,9 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.THEMESWITCHER_ID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@ -30,32 +38,53 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import com.google.inject.Inject;
public class ContainerLogsPage extends NMView {
public static final String REDIRECT_URL = "redirect.url";
@Override protected void preHead(Page.HTML<_> html) {
String redirectUrl = $(REDIRECT_URL);
if (redirectUrl == null || redirectUrl.isEmpty()) {
set(TITLE, join("Logs for ", $(CONTAINER_ID)));
html.meta_http("refresh", "10");
} else {
if (redirectUrl.equals("false")) {
set(TITLE, join("Failed redirect for ", $(CONTAINER_ID)));
//Error getting redirect url. Fall through.
} else {
set(TITLE, join("Redirecting to log server for ", $(CONTAINER_ID)));
html.meta_http("refresh", "1; url=" + redirectUrl);
}
}
set(ACCORDION_ID, "nav");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
set(THEMESWITCHER_ID, "themeswitcher");
}
@Override
protected Class<? extends SubView> content() {
return ContainersLogsBlock.class;
}
public static class ContainersLogsBlock extends HtmlBlock implements
NMWebParams {
NMWebParams {
private final Configuration conf;
private final LocalDirAllocator logsSelector;
private final Context nmContext;
@ -72,13 +101,19 @@ public class ContainerLogsPage extends NMView {
@Override
protected void render(Block html) {
DIV<Hamlet> div = html.div("#content");
String redirectUrl = $(REDIRECT_URL);
if (redirectUrl !=null && redirectUrl.equals("false")) {
html.h1("Failed while trying to construct the redirect url to the log" +
" server. Log Server url may not be configured");
//Intentional fallthrough.
}
ContainerId containerId;
try {
containerId = ConverterUtils.toContainerId($(CONTAINER_ID));
} catch (IllegalArgumentException e) {
div.h1("Invalid containerId " + $(CONTAINER_ID))._();
html.h1("Invalid containerId " + $(CONTAINER_ID));
return;
}
@ -88,19 +123,28 @@ public class ContainerLogsPage extends NMView {
applicationId);
Container container = this.nmContext.getContainers().get(containerId);
if (application == null || container == null) {
div.h1(
"Unknown container. Container is either not yet running or "
if (application == null) {
html.h1(
"Unknown container. Container either has not started or "
+ "has already completed or "
+ "doesn't belong to this node at all.")._();
+ "doesn't belong to this node at all.");
return;
}
if (container == null) {
// Container may have alerady completed, but logs not aggregated yet.
printLogs(html, containerId, applicationId, application);
return;
}
if (EnumSet.of(ContainerState.NEW, ContainerState.LOCALIZING,
ContainerState.LOCALIZING).contains(container.getContainerState())) {
div.h1("Container is not yet running. Current state is "
+ container.getContainerState())
._();
ContainerState.LOCALIZED).contains(container.getContainerState())) {
html.h1("Container is not yet running. Current state is "
+ container.getContainerState());
return;
}
if (container.getContainerState() == ContainerState.LOCALIZATION_FAILED) {
html.h1("Container wasn't started. Localization failed.");
return;
}
@ -108,103 +152,144 @@ public class ContainerLogsPage extends NMView {
ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_SUCCESS).contains(
container.getContainerState())) {
printLogs(html, containerId, applicationId, application);
return;
}
if (EnumSet.of(ContainerState.KILLING,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_RESOURCES_CLEANINGUP).contains(
container.getContainerState())) {
//Container may have generated some logs before being killed.
printLogs(html, containerId, applicationId, application);
return;
}
if (container.getContainerState().equals(ContainerState.DONE)) {
// Prev state unknown. Logs may be available.
printLogs(html, containerId, applicationId, application);
return;
} else {
html.h1("Container is no longer running...");
return;
}
}
// Check for the authorization.
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null && !this.aclsManager.checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, application.getUser(),
applicationId)) {
div.h1(
"You (User " + remoteUser
+ ") are not authorized to view the logs for application "
+ applicationId)._();
private void printLogs(Block html, ContainerId containerId,
ApplicationId applicationId, Application application) {
// Check for the authorization.
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null
&& !this.aclsManager.checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, application.getUser(),
applicationId)) {
html.h1(
"User [" + remoteUser
+ "] is not authorized to view the logs for application "
+ applicationId);
return;
}
if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
File logFile = null;
try {
logFile =
new File(this.logsSelector
.getLocalPathToRead(
ContainerLaunch.getRelativeContainerLogDir(
applicationId.toString(), containerId.toString())
+ Path.SEPARATOR + $(CONTAINER_LOG_TYPE), this.conf)
.toUri().getPath());
} catch (Exception e) {
html.h1("Cannot find this log on the local disk.");
return;
}
if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
File logFile = null;
try {
logFile =
new File(this.logsSelector
.getLocalPathToRead(
ConverterUtils.toString(
applicationId)
+ Path.SEPARATOR + $(CONTAINER_ID)
+ Path.SEPARATOR
+ $(CONTAINER_LOG_TYPE), this.conf).toUri()
.getPath());
} catch (Exception e) {
div.h1("Cannot find this log on the local disk.")._();
}
div.h1(logFile == null ? "Unknown LogFile" : logFile.getName());
long start =
$("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
start = start < 0 ? logFile.length() + start : start;
start = start < 0 ? 0 : start;
long end =
$("end").isEmpty() ? logFile.length() : Long
.parseLong($("end"));
end = end < 0 ? logFile.length() + end : end;
end = end < 0 ? logFile.length() : end;
if (start > end) {
writer().write("Invalid start and end values!");
} else {
long start =
$("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
start = start < 0 ? logFile.length() + start : start;
start = start < 0 ? 0 : start;
long end =
$("end").isEmpty() ? logFile.length() : Long.parseLong($("end"));
end = end < 0 ? logFile.length() + end : end;
end = end < 0 ? logFile.length() : end;
if (start > end) {
html.h1("Invalid start and end values. Start: [" + start + "]"
+ ", end[" + end + "]");
return;
} else {
InputStreamReader reader = null;
try {
long toRead = end - start;
if (toRead < logFile.length()) {
div._("Showing " + toRead + " bytes. Click ")
.a(url("containerlogs", $(CONTAINER_ID),
logFile.getName()), "here")
._(" for full log").br()._();
html.p()._("Showing " + toRead + " bytes. Click ")
.a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
logFile.getName(), "?start=0"), "here").
_(" for full log")._();
}
// TODO: Use secure IO Utils to avoid symlink attacks.
//TODO Fix findBugs close warning along with IOUtils change
FileReader reader = new FileReader(logFile);
char[] cbuf = new char[65536];
reader.skip(start);
int len = 0;
int totalRead = 0;
writer().write("<pre>");
while ((len = reader.read(cbuf, 0, (int) toRead)) > 0
&& totalRead < (end - start)) {
writer().write(cbuf, 0, len); // TODO: HTMl Quoting?
totalRead += len;
toRead = toRead - totalRead;
// TODO Fix findBugs close warning along with IOUtils change
reader = new FileReader(logFile);
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
long skipped = 0;
long totalSkipped = 0;
while (totalSkipped < start) {
skipped = reader.skip(start - totalSkipped);
totalSkipped += skipped;
}
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
writer().write("<pre>");
while ((len = reader.read(cbuf, 0, currentToRead)) > 0
&& toRead > 0) {
writer().write(cbuf, 0, len); // TODO: HTMl Quoting?
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
reader.close();
writer().write("</pre>");
} catch (IOException e) {
writer().write(
"Exception reading log-file "
+ StringUtils.stringifyException(e));
}
}
div._();
} else {
// Just print out the log-types
List<File> containerLogsDirs =
getContainerLogDirs(this.conf, containerId);
for (File containerLogsDir : containerLogsDirs) {
for (File logFile : containerLogsDir.listFiles()) {
div
.p()
.a(
url("containerlogs", $(CONTAINER_ID),
logFile.getName(), "?start=-4076"),
logFile.getName() + " : Total file length is "
+ logFile.length() + " bytes.")
._();
html.h1("Exception reading log-file. Log file was likely aggregated. "
+ StringUtils.stringifyException(e));
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
// Ignore
}
}
}
div._();
}
} else {
div.h1("Container is no longer running..")._();
// Just print out the log-types
List<File> containerLogsDirs =
getContainerLogDirs(this.conf, containerId);
boolean foundLogFile = false;
for (File containerLogsDir : containerLogsDirs) {
for (File logFile : containerLogsDir.listFiles()) {
foundLogFile = true;
html.p()
.a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
logFile.getName(), "?start=-4096"),
logFile.getName() + " : Total file length is "
+ logFile.length() + " bytes.")._();
}
}
if (!foundLogFile) {
html.h1("No logs available for container " + containerId.toString());
return;
}
}
return;
}
static List<File>
@ -222,6 +307,5 @@ public class ContainerLogsPage extends NMView {
}
return containerLogDirs;
}
}
}

View File

@ -89,7 +89,8 @@ public class ContainerPage extends NMView implements NMWebParams {
._("User", container.getUser())
._("TotalMemoryNeeded",
container.getLaunchContext().getResource().getMemory())
._("logs", ujoin("containerlogs", $(CONTAINER_ID)), "Link to logs");
._("logs", ujoin("containerlogs", $(CONTAINER_ID), container.getUser()),
"Link to logs");
html._(InfoBlock.class);
}
}

View File

@ -21,15 +21,27 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.Controller;
import com.google.inject.Inject;
public class NMController extends Controller implements NMWebParams {
private Context nmContext;
private Configuration nmConf;
@Inject
public NMController(Configuration nmConf, RequestContext requestContext) {
public NMController(Configuration nmConf, RequestContext requestContext,
Context nmContext) {
super(requestContext);
this.nmContext = nmContext;
this.nmConf = nmConf;
}
@Override
@ -63,6 +75,29 @@ public class NMController extends Controller implements NMWebParams {
}
public void logs() {
String containerIdStr = $(CONTAINER_ID);
ContainerId containerId = null;
try {
containerId = ConverterUtils.toContainerId(containerIdStr);
} catch (IllegalArgumentException e) {
render(ContainerLogsPage.class);
return;
}
ApplicationId appId =
containerId.getApplicationAttemptId().getApplicationId();
Application app = nmContext.getApplications().get(appId);
if (app == null) {
String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
String redirectUrl = null;
if (logServerUrl == null || logServerUrl.isEmpty()) {
redirectUrl = "false";
} else {
redirectUrl =
url(logServerUrl, nmContext.getNodeId().toString(), containerIdStr,
containerIdStr, $(APP_OWNER));
}
set(ContainerLogsPage.REDIRECT_URL, redirectUrl);
}
render(ContainerLogsPage.class);
}
}

View File

@ -23,4 +23,6 @@ public interface NMWebParams {
String APPLICATION_ID = "nm.appId";
String CONTAINER_ID = "nm.containerId";
String CONTAINER_LOG_TYPE= "nm.containerLogType";
String ENTITY_STRING = "nm.entityString";
String APP_OWNER = "nm.appOwner";
}

View File

@ -99,7 +99,8 @@ public class WebServer extends AbstractService {
"application");
route(pajoin("/container", CONTAINER_ID), NMController.class,
"container");
route(pajoin("/containerlogs", CONTAINER_ID, CONTAINER_LOG_TYPE),
route(
pajoin("/containerlogs", CONTAINER_ID, APP_OWNER, CONTAINER_LOG_TYPE),
NMController.class, "logs");
}

View File

@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -56,8 +57,6 @@ public class DummyContainerManager extends ContainerManagerImpl {
private static final Log LOG = LogFactory
.getLog(DummyContainerManager.class);
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
public DummyContainerManager(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@ -150,7 +149,8 @@ public class DummyContainerManager extends ContainerManagerImpl {
@Override
protected LogAggregationService createLogAggregationService(Context context,
DeletionService deletionService) {
return new LogAggregationService(context, deletionService) {
return new LogAggregationService(new AsyncDispatcher(), context,
deletionService) {
@Override
public void handle(LogAggregatorEvent event) {
switch (event.getType()) {

View File

@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -351,6 +352,7 @@ public class TestApplication {
final EventHandler<LogAggregatorEvent> logAggregationBus;
final String user;
final List<Container> containers;
final Context context;
final ApplicationId appId;
final Application app;
@ -373,11 +375,13 @@ public class TestApplication {
dispatcher.register(ContainerEventType.class, containerBus);
dispatcher.register(LogAggregatorEventType.class, logAggregationBus);
context = mock(Context.class);
this.user = user;
this.appId = BuilderUtils.newApplicationId(timestamp, id);
app = new ApplicationImpl(dispatcher, new ApplicationACLsManager(
new Configuration()), this.user, appId, null);
new Configuration()), this.user, appId, null, context);
containers = new ArrayList<Container>();
for (int i = 0; i < numContainers; i++) {
containers.add(createMockedContainer(this.appId, i));

View File

@ -18,6 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
@ -28,8 +34,10 @@ import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import junit.framework.Assert;
@ -41,6 +49,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -50,11 +59,15 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
@ -62,12 +75,15 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@Ignore
//@Ignore
public class TestLogAggregationService extends BaseContainerManagerTest {
private Map<ApplicationAccessType, String> acls = createAppAcls();
static {
LOG = LogFactory.getLog(TestLogAggregationService.class);
}
@ -91,17 +107,25 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
}
@Test
@SuppressWarnings("unchecked")
public void testLocalFileDeletionAfterUpload() throws IOException {
this.delSrvc = new DeletionService(createContainerExecutor());
this.delSrvc.init(conf);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(this.context, this.delSrvc);
new LogAggregationService(dispatcher, this.context, this.delSrvc);
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
// AppLogDir should be created
@ -111,13 +135,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS));
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(application1);
appAttemptId.setAttemptId(1);
ContainerId container11 =
BuilderUtils.newContainerId(recordFactory, application1, appAttemptId, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(application1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
logAggregationService.handle(
@ -128,6 +150,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService.stop();
String containerIdStr = ConverterUtils.toString(container11);
File containerLogDir = new File(app1LogDir, containerIdStr);
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
@ -136,17 +159,37 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
Assert.assertFalse(app1LogDir.exists());
Assert.assertTrue(new File(logAggregationService
.getRemoteNodeLogFileForApp(application1).toUri().getPath()).exists());
Path logFilePath =
logAggregationService.getRemoteNodeLogFileForApp(application1,
this.user);
Assert.assertTrue("Log file [" + logFilePath + "] not found", new File(
logFilePath.toUri().getPath()).exists());
dispatcher.await();
ArgumentCaptor<ApplicationEvent> eventCaptor =
ArgumentCaptor.forClass(ApplicationEvent.class);
verify(appEventHandler).handle(eventCaptor.capture());
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
eventCaptor.getValue().getType());
assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
.getApplicationID());
}
@Test
@SuppressWarnings("unchecked")
public void testNoContainerOnNode() {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(this.context, this.delSrvc);
new LogAggregationService(dispatcher, this.context, this.delSrvc);
logAggregationService.init(this.conf);
logAggregationService.start();
@ -159,27 +202,43 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS));
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
application1));
logAggregationService.stop();
Assert
.assertFalse(new File(logAggregationService
.getRemoteNodeLogFileForApp(application1).toUri().getPath())
.exists());
Assert.assertFalse(new File(logAggregationService
.getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
.exists());
dispatcher.await();
ArgumentCaptor<ApplicationEvent> eventCaptor =
ArgumentCaptor.forClass(ApplicationEvent.class);
verify(appEventHandler).handle(eventCaptor.capture());
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
eventCaptor.getValue().getType());
verify(appEventHandler).handle(eventCaptor.capture());
assertEquals(application1, eventCaptor.getValue()
.getApplicationID());
}
@Test
@SuppressWarnings("unchecked")
public void testMultipleAppsLogAggregation() throws IOException {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
DrainDispatcher dispatcher = createDispatcher();
EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler);
LogAggregationService logAggregationService =
new LogAggregationService(this.context, this.delSrvc);
new LogAggregationService(dispatcher, this.context, this.delSrvc);
logAggregationService.init(this.conf);
logAggregationService.start();
@ -192,81 +251,69 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
logAggregationService
.handle(new LogAggregatorAppStartedEvent(
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS));
ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
ApplicationAttemptId appAttemptId1 =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId1.setApplicationId(application1);
ContainerId container11 =
BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1, 1);
ApplicationAttemptId appAttemptId1 =
BuilderUtils.newApplicationAttemptId(application1, 1);
ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container11, 0));
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
ApplicationAttemptId appAttemptId2 =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId1.setApplicationId(application2);
ApplicationAttemptId appAttemptId2 =
BuilderUtils.newApplicationAttemptId(application2, 1);
File app2LogDir =
new File(localLogDir, ConverterUtils.toString(application2));
app2LogDir.mkdir();
logAggregationService.handle(new LogAggregatorAppStartedEvent(
application2, this.user, null,
ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY));
ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
ContainerId container21 =
BuilderUtils.newContainerId(recordFactory, application2,
appAttemptId2, 1);
ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1);
writeContainerLogs(app2LogDir, container21);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container21, 0));
ContainerId container12 =
BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1,
2);
ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
writeContainerLogs(app1LogDir, container12);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container12, 0));
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
ApplicationAttemptId appAttemptId3 =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId1.setApplicationId(application3);
ApplicationAttemptId appAttemptId3 =
BuilderUtils.newApplicationAttemptId(application3, 1);
File app3LogDir =
new File(localLogDir, ConverterUtils.toString(application3));
app3LogDir.mkdir();
logAggregationService.handle(new LogAggregatorAppStartedEvent(
application3, this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY));
logAggregationService.handle(new LogAggregatorAppStartedEvent(application3,
this.user, null,
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
ContainerId container31 =
BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
1);
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container31);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container31, 0));
ContainerId container32 =
BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
2);
ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
writeContainerLogs(app3LogDir, container32);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container32, 1)); // Failed
ContainerId container22 =
BuilderUtils.newContainerId(recordFactory, application2, appAttemptId2,
2);
ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
writeContainerLogs(app2LogDir, container22);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container22, 0));
ContainerId container33 =
BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
3);
ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
writeContainerLogs(app3LogDir, container33);
logAggregationService.handle(
new LogAggregatorContainerFinishedEvent(container33, 0));
@ -286,6 +333,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
new ContainerId[] { container21 });
verifyContainerLogs(logAggregationService, application3,
new ContainerId[] { container31, container32 });
dispatcher.await();
ArgumentCaptor<ApplicationEvent> eventCaptor =
ArgumentCaptor.forClass(ApplicationEvent.class);
verify(appEventHandler, times(3)).handle(eventCaptor.capture());
List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
Set<ApplicationId> appIds = new HashSet<ApplicationId>();
for (ApplicationEvent cap : capturedEvents) {
assertEquals(ApplicationEventType.APPLICATION_LOG_AGGREGATION_FINISHED,
eventCaptor.getValue().getType());
appIds.add(cap.getApplicationID());
}
assertTrue(appIds.contains(application1));
assertTrue(appIds.contains(application2));
assertTrue(appIds.contains(application3));
}
private void writeContainerLogs(File appLogDir, ContainerId containerId)
@ -306,7 +369,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
ContainerId[] expectedContainerIds) throws IOException {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(this.conf,
logAggregationService.getRemoteNodeLogFileForApp(appId));
logAggregationService.getRemoteNodeLogFileForApp(appId, this.user));
Assert.assertEquals(this.user, reader.getApplicationOwner());
verifyAcls(reader.getApplicationAcls());
try {
Map<String, Map<String, String>> logMap =
new HashMap<String, Map<String, String>>();
@ -380,8 +447,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
public void testLogAggregationForRealContainerLaunch() throws IOException,
InterruptedException {
this.conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, true);
this.containerManager.start();
File scriptFile = new File(tmpDir, "scriptFile.sh");
PrintWriter fileWriter = new PrintWriter(scriptFile);
fileWriter.write("\necho Hello World! Stdout! > "
@ -400,13 +469,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
appId.setId(0);
ApplicationAttemptId appAttemptId =
recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setId(0);
cId.setApplicationAttemptId(appAttemptId);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(this.user);
@ -446,4 +512,27 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
.asList(appId)));
this.containerManager.stop();
}
private void verifyAcls(Map<ApplicationAccessType, String> logAcls) {
Assert.assertEquals(this.acls.size(), logAcls.size());
for (ApplicationAccessType appAccessType : this.acls.keySet()) {
Assert.assertEquals(this.acls.get(appAccessType),
logAcls.get(appAccessType));
}
}
private DrainDispatcher createDispatcher() {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(this.conf);
dispatcher.start();
return dispatcher;
}
private Map<ApplicationAccessType, String> createAppAcls() {
Map<ApplicationAccessType, String> appAcls =
new HashMap<ApplicationAccessType, String>();
appAcls.put(ApplicationAccessType.MODIFY_APP, "user group");
appAcls.put(ApplicationAccessType.VIEW_APP, "*");
return appAcls;
}
}

View File

@ -194,8 +194,13 @@ public class AMLauncher implements Runnable {
// consumable by the AM.
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV, container
.getContainerId().toString());
environment.put(ApplicationConstants.NM_HTTP_ADDRESS_ENV, application
.getMasterContainer().getNodeHttpAddress());
environment.put(ApplicationConstants.NM_HOST_ENV, application
.getMasterContainer().getNodeId().getHost());
environment.put(ApplicationConstants.NM_PORT_ENV,
String.valueOf(application.getMasterContainer().getNodeId().getPort()));
String parts[] =
application.getMasterContainer().getNodeHttpAddress().split(":");
environment.put(ApplicationConstants.NM_HTTP_PORT_ENV, parts[1]);
environment.put(
ApplicationConstants.APP_SUBMIT_TIME_ENV,
String.valueOf(rmContext.getRMApps()

View File

@ -54,8 +54,8 @@ public class MockNM {
return nodeId;
}
public String getHttpAddress() {
return nodeId.getHost() + ":" + String.valueOf(httpPort);
public int getHttpPort() {
return httpPort;
}
public void containerStatus(Container container) throws Exception {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -60,7 +61,9 @@ public class TestApplicationMasterLauncher {
boolean cleanedup = false;
String attemptIdAtContainerManager = null;
String containerIdAtContainerManager = null;
String nmAddressAtContainerManager = null;
String nmHostAtContainerManager = null;
int nmPortAtContainerManager;
int nmHttpPortAtContainerManager;
long submitTimeAtContainerManager;
@Override
@ -69,20 +72,22 @@ public class TestApplicationMasterLauncher {
throws YarnRemoteException {
LOG.info("Container started by MyContainerManager: " + request);
launched = true;
Map<String, String> env =
request.getContainerLaunchContext().getEnvironment();
containerIdAtContainerManager =
request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
env.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
ContainerId containerId =
ConverterUtils.toContainerId(containerIdAtContainerManager);
attemptIdAtContainerManager =
containerId.getApplicationAttemptId().toString();
nmAddressAtContainerManager =
request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.NM_HTTP_ADDRESS_ENV);
nmHostAtContainerManager = env.get(ApplicationConstants.NM_HOST_ENV);
nmPortAtContainerManager =
Integer.parseInt(env.get(ApplicationConstants.NM_PORT_ENV));
nmHttpPortAtContainerManager =
Integer.parseInt(env.get(ApplicationConstants.NM_HTTP_PORT_ENV));
submitTimeAtContainerManager =
Long.parseLong(request.getContainerLaunchContext().getEnvironment()
.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
return null;
}
@ -168,8 +173,12 @@ public class TestApplicationMasterLauncher {
Assert.assertEquals(app.getRMAppAttempt(appAttemptId)
.getSubmissionContext().getAMContainerSpec().getContainerId()
.toString(), containerManager.containerIdAtContainerManager);
Assert.assertEquals(nm1.getHttpAddress(),
containerManager.nmAddressAtContainerManager);
Assert.assertEquals(nm1.getNodeId().getHost(),
containerManager.nmHostAtContainerManager);
Assert.assertEquals(nm1.getNodeId().getPort(),
containerManager.nmPortAtContainerManager);
Assert.assertEquals(nm1.getHttpPort(),
containerManager.nmHttpPortAtContainerManager);
MockAM am = new MockAM(rm.getRMContext(), rm
.getApplicationMasterService(), appAttemptId);

View File

@ -325,9 +325,9 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
* When the ApplicationMaster starts up, several parameters are made available
to it via the environment. These include the ContainerId for the
ApplicationMaster container, the application submission time and the HTTP
address of the NodeManager running the container. Ref ApplicationConstants
for parameter names.
ApplicationMaster container, the application submission time and details
about the NodeManager host running the Application Master.
Ref ApplicationConstants for parameter names.
* All interactions with the ResourceManager require an ApplicationAttemptId
(there can be multiple attempts per application in case of failures). The

View File

@ -265,6 +265,11 @@
<version>4.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>

View File

@ -156,6 +156,11 @@
<artifactId>netty</artifactId>
<version>3.2.3.Final</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.cenqua.clover</groupId>
<artifactId>clover</artifactId>