YARN-649. Added a new NM web-service to serve container logs in plain text over HTTP. Contributed by Sandy Ryza.

svn merge --ignore-ancestry -c 1519326 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1519327 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-09-02 00:09:53 +00:00
parent fa39efe421
commit 0246b60e93
17 changed files with 508 additions and 289 deletions

View File

@ -6,6 +6,9 @@ Release 2.3.0 - UNRELEASED
NEW FEATURES
YARN-649. Added a new NM web-service to serve container logs in plain text
over HTTP. (Sandy Ryza via vinodkv)
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
/**
* Context interface for sharing information across components in the
@ -61,4 +62,8 @@ public interface Context {
NodeHealthStatus getNodeHealthStatus();
ContainerManagementProtocol getContainerManager();
LocalDirsHandlerService getLocalDirsHandler();
ApplicationACLsManager getApplicationACLsManager();
}

View File

@ -123,7 +123,8 @@ public class NodeManager extends CompositeService
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
return new NMContext(containerTokenSecretManager, nmTokenSecretManager);
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
dirsHandler, aclsManager);
}
protected void doSecureLogin() throws IOException {
@ -142,9 +143,6 @@ public class NodeManager extends CompositeService
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM();
this.context =
createNMContext(containerTokenSecretManager, nmTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf);
ContainerExecutor exec = ReflectionUtils.newInstance(
@ -165,7 +163,9 @@ public class NodeManager extends CompositeService
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
@ -319,14 +319,19 @@ public class NodeManager extends CompositeService
private final NMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInNM nmTokenSecretManager;
private ContainerManagementProtocol containerManager;
private final LocalDirsHandlerService dirsHandler;
private final ApplicationACLsManager aclsManager;
private WebServer webServer;
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) {
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
this.aclsManager = aclsManager;
this.nodeHealthStatus.setIsNodeHealthy(true);
this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
@ -386,6 +391,16 @@ public class NodeManager extends CompositeService
public void setNodeId(NodeId nodeId) {
this.nodeId = nodeId;
}
@Override
public LocalDirsHandlerService getLocalDirsHandler() {
return dirsHandler;
}
@Override
public ApplicationACLsManager getApplicationACLsManager() {
return aclsManager;
}
}

View File

@ -468,8 +468,7 @@ public class ContainerManagerImpl extends CompositeService implements
// Create the application
Application application =
new ApplicationImpl(dispatcher, this.aclsManager, user, applicationID,
credentials, context);
new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);

View File

@ -73,14 +73,13 @@ public class ApplicationImpl implements Application {
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
public ApplicationImpl(Dispatcher dispatcher,
ApplicationACLsManager aclsManager, String user, ApplicationId appId,
public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId,
Credentials credentials, Context context) {
this.dispatcher = dispatcher;
this.user = user;
this.appId = appId;
this.credentials = credentials;
this.aclsManager = aclsManager;
this.aclsManager = context.getApplicationACLsManager();
this.context = context;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();

View File

@ -28,36 +28,21 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
import org.mortbay.log.Log;
import com.google.inject.Inject;
@ -90,19 +75,11 @@ public class ContainerLogsPage extends NMView {
public static class ContainersLogsBlock extends HtmlBlock implements
YarnWebParams {
private final Configuration conf;
private final Context nmContext;
private final ApplicationACLsManager aclsManager;
private final LocalDirsHandlerService dirsHandler;
@Inject
public ContainersLogsBlock(Configuration conf, Context context,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
this.conf = conf;
public ContainersLogsBlock(Context context) {
this.nmContext = context;
this.aclsManager = aclsManager;
this.dirsHandler = dirsHandler;
}
@Override
@ -114,229 +91,123 @@ public class ContainerLogsPage extends NMView {
" server. Log Server url may not be configured");
//Intentional fallthrough.
}
ContainerId containerId;
try {
containerId = ConverterUtils.toContainerId($(CONTAINER_ID));
} catch (IllegalArgumentException e) {
html.h1("Invalid containerId " + $(CONTAINER_ID));
} catch (IllegalArgumentException ex) {
html.h1("Invalid container ID: " + $(CONTAINER_ID));
return;
}
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
Application application = this.nmContext.getApplications().get(
applicationId);
Container container = this.nmContext.getContainers().get(containerId);
if (application == null) {
html.h1(
"Unknown container. Container either has not started or "
+ "has already completed or "
+ "doesn't belong to this node at all.");
return;
}
if (container == null) {
// Container may have alerady completed, but logs not aggregated yet.
printLogs(html, containerId, applicationId, application);
return;
}
if (EnumSet.of(ContainerState.NEW, ContainerState.LOCALIZING,
ContainerState.LOCALIZED).contains(container.getContainerState())) {
html.h1("Container is not yet running. Current state is "
+ container.getContainerState());
return;
}
if (container.getContainerState() == ContainerState.LOCALIZATION_FAILED) {
html.h1("Container wasn't started. Localization failed.");
return;
}
if (EnumSet.of(ContainerState.RUNNING,
ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_SUCCESS).contains(
container.getContainerState())) {
printLogs(html, containerId, applicationId, application);
return;
}
if (EnumSet.of(ContainerState.KILLING,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_RESOURCES_CLEANINGUP).contains(
container.getContainerState())) {
//Container may have generated some logs before being killed.
printLogs(html, containerId, applicationId, application);
return;
}
if (container.getContainerState().equals(ContainerState.DONE)) {
// Prev state unknown. Logs may be available.
printLogs(html, containerId, applicationId, application);
return;
} else {
html.h1("Container is no longer running...");
return;
}
}
private void printLogs(Block html, ContainerId containerId,
ApplicationId applicationId, Application application) {
// Check for the authorization.
String remoteUser = request().getRemoteUser();
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null
&& !this.aclsManager.checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, application.getUser(),
applicationId)) {
html.h1(
"User [" + remoteUser
+ "] is not authorized to view the logs for application "
+ applicationId);
return;
}
if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
File logFile = null;
try {
URI logPathURI = new URI(this.dirsHandler.getLogPathToRead(
ContainerLaunch.getRelativeContainerLogDir(
applicationId.toString(), containerId.toString())
+ Path.SEPARATOR + $(CONTAINER_LOG_TYPE)).toString());
logFile = new File(logPathURI.getPath());
} catch (URISyntaxException e) {
html.h1("Cannot find this log on the local disk.");
return;
} catch (Exception e) {
html.h1("Cannot find this log on the local disk.");
return;
}
long start =
$("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
start = start < 0 ? logFile.length() + start : start;
start = start < 0 ? 0 : start;
long end =
$("end").isEmpty() ? logFile.length() : Long.parseLong($("end"));
end = end < 0 ? logFile.length() + end : end;
end = end < 0 ? logFile.length() : end;
if (start > end) {
html.h1("Invalid start and end values. Start: [" + start + "]"
+ ", end[" + end + "]");
return;
try {
if ($(CONTAINER_LOG_TYPE).isEmpty()) {
List<File> logFiles = ContainerLogsUtils.getContainerLogDirs(containerId,
request().getRemoteUser(), nmContext);
printLogFileDirectory(html, logFiles);
} else {
FileInputStream logByteStream = null;
try {
logByteStream =
SecureIOUtils.openForRead(logFile, application.getUser(), null);
} catch (IOException e) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
if (e.getMessage().contains(
"did not match expected owner '" + application.getUser()
+ "'")) {
html.h1("Exception reading log file. Application submitted by '"
+ application.getUser()
+ "' doesn't own requested log file : "
+ logFile.getName());
} else {
html.h1("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName());
}
return;
}
try {
long toRead = end - start;
if (toRead < logFile.length()) {
html.p()._("Showing " + toRead + " bytes. Click ")
.a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
logFile.getName(), "?start=0"), "here").
_(" for full log")._();
}
// TODO Fix findBugs close warning along with IOUtils change
IOUtils.skipFully(logByteStream, start);
InputStreamReader reader = new InputStreamReader(logByteStream);
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
PRE<Hamlet> pre = html.pre();
while ((len = reader.read(cbuf, 0, currentToRead)) > 0
&& toRead > 0) {
pre._(new String(cbuf, 0, len));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre._();
reader.close();
} catch (IOException e) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
html.h1("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName());
} finally {
if (logByteStream != null) {
try {
logByteStream.close();
} catch (IOException e) {
// Ignore
}
}
}
File logFile = ContainerLogsUtils.getContainerLogFile(containerId,
$(CONTAINER_LOG_TYPE), request().getRemoteUser(), nmContext);
printLogFile(html, logFile);
}
} catch (YarnException ex) {
html.h1(ex.getMessage());
} catch (NotFoundException ex) {
html.h1(ex.getMessage());
}
}
private void printLogFile(Block html, File logFile) {
long start =
$("start").isEmpty() ? -4 * 1024 : Long.parseLong($("start"));
start = start < 0 ? logFile.length() + start : start;
start = start < 0 ? 0 : start;
long end =
$("end").isEmpty() ? logFile.length() : Long.parseLong($("end"));
end = end < 0 ? logFile.length() + end : end;
end = end < 0 ? logFile.length() : end;
if (start > end) {
html.h1("Invalid start and end values. Start: [" + start + "]"
+ ", end[" + end + "]");
return;
} else {
// Print out log types in lexical order
List<File> containerLogsDirs = getContainerLogDirs(containerId,
dirsHandler);
Collections.sort(containerLogsDirs);
boolean foundLogFile = false;
for (File containerLogsDir : containerLogsDirs) {
File[] logFiles = containerLogsDir.listFiles();
if (logFiles != null) {
Arrays.sort(logFiles);
for (File logFile : logFiles) {
foundLogFile = true;
html.p()
.a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
logFile.getName(), "?start=-4096"),
logFile.getName() + " : Total file length is "
+ logFile.length() + " bytes.")._();
}
}
}
if (!foundLogFile) {
html.h1("No logs available for container " + containerId.toString());
FileInputStream logByteStream = null;
try {
logByteStream = ContainerLogsUtils.openLogFileForRead($(CONTAINER_ID),
logFile, nmContext);
} catch (IOException ex) {
html.h1(ex.getMessage());
return;
}
}
return;
}
static List<File> getContainerLogDirs(ContainerId containerId,
LocalDirsHandlerService dirsHandler) {
List<String> logDirs = dirsHandler.getLogDirs();
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
for (String logDir : logDirs) {
try {
logDir = new URI(logDir).getPath();
} catch (URISyntaxException e) {
Log.warn(e.getMessage());
long toRead = end - start;
if (toRead < logFile.length()) {
html.p()._("Showing " + toRead + " bytes. Click ")
.a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
logFile.getName(), "?start=0"), "here").
_(" for full log")._();
}
IOUtils.skipFully(logByteStream, start);
InputStreamReader reader = new InputStreamReader(logByteStream);
int bufferSize = 65536;
char[] cbuf = new char[bufferSize];
int len = 0;
int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
PRE<Hamlet> pre = html.pre();
while ((len = reader.read(cbuf, 0, currentToRead)) > 0
&& toRead > 0) {
pre._(new String(cbuf, 0, len));
toRead = toRead - len;
currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
}
pre._();
reader.close();
} catch (IOException e) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
html.h1("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName());
} finally {
if (logByteStream != null) {
try {
logByteStream.close();
} catch (IOException e) {
// Ignore
}
}
}
String appIdStr = ConverterUtils.toString(containerId
.getApplicationAttemptId().getApplicationId());
File appLogDir = new File(logDir, appIdStr);
String containerIdStr = ConverterUtils.toString(containerId);
containerLogDirs.add(new File(appLogDir, containerIdStr));
}
return containerLogDirs;
}
private void printLogFileDirectory(Block html, List<File> containerLogsDirs) {
// Print out log types in lexical order
Collections.sort(containerLogsDirs);
boolean foundLogFile = false;
for (File containerLogsDir : containerLogsDirs) {
File[] logFiles = containerLogsDir.listFiles();
if (logFiles != null) {
Arrays.sort(logFiles);
for (File logFile : logFiles) {
foundLogFile = true;
html.p()
.a(url("containerlogs", $(CONTAINER_ID), $(APP_OWNER),
logFile.getName(), "?start=-4096"),
logFile.getName() + " : Total file length is "
+ logFile.length() + " bytes.")._();
}
}
}
if (!foundLogFile) {
html.h1("No logs available for container " + $(CONTAINER_ID));
return;
}
}
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Contains utilities for fetching a user's log file in a secure fashion.
*/
public class ContainerLogsUtils {
public static final Logger LOG = LoggerFactory.getLogger(ContainerLogsUtils.class);
/**
* Finds the local directories that logs for the given container are stored
* on.
*/
public static List<File> getContainerLogDirs(ContainerId containerId,
String remoteUser, Context context) throws YarnException {
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new YarnException("Container does not exist.");
}
Application application = getApplicationForContainer(containerId, context);
checkAccess(remoteUser, application, context);
checkState(container.getContainerState());
return getContainerLogDirs(containerId, context.getLocalDirsHandler());
}
static List<File> getContainerLogDirs(ContainerId containerId,
LocalDirsHandlerService dirsHandler) throws YarnException {
List<String> logDirs = dirsHandler.getLogDirs();
List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
for (String logDir : logDirs) {
try {
logDir = new URI(logDir).getPath();
} catch (URISyntaxException e) {
throw new YarnException("Internal error", e);
}
String appIdStr = ConverterUtils.toString(containerId
.getApplicationAttemptId().getApplicationId());
File appLogDir = new File(logDir, appIdStr);
containerLogDirs.add(new File(appLogDir, containerId.toString()));
}
return containerLogDirs;
}
/**
* Finds the log file with the given filename for the given container.
*/
public static File getContainerLogFile(ContainerId containerId,
String fileName, String remoteUser, Context context) throws YarnException {
Container container = context.getContainers().get(containerId);
if (container == null) {
throw new NotFoundException("Container with id " + containerId
+ " not found.");
}
Application application = getApplicationForContainer(containerId, context);
checkAccess(remoteUser, application, context);
checkState(container.getContainerState());
try {
LocalDirsHandlerService dirsHandler = context.getLocalDirsHandler();
String relativeContainerLogDir = ContainerLaunch.getRelativeContainerLogDir(
application.getAppId().toString(), containerId.toString());
Path logPath = dirsHandler.getLogPathToRead(
relativeContainerLogDir + Path.SEPARATOR + fileName);
URI logPathURI = new URI(logPath.toString());
File logFile = new File(logPathURI.getPath());
return logFile;
} catch (URISyntaxException e) {
throw new YarnException("Internal error", e);
} catch (IOException e) {
LOG.warn("Failed to find log file", e);
throw new NotFoundException("Cannot find this log on the local disk.");
}
}
private static Application getApplicationForContainer(ContainerId containerId,
Context context) {
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
Application application = context.getApplications().get(
applicationId);
if (application == null) {
throw new NotFoundException(
"Unknown container. Container either has not started or "
+ "has already completed or "
+ "doesn't belong to this node at all.");
}
return application;
}
private static void checkAccess(String remoteUser, Application application,
Context context) throws YarnException {
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null
&& !context.getApplicationACLsManager().checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, application.getUser(),
application.getAppId())) {
throw new YarnException(
"User [" + remoteUser
+ "] is not authorized to view the logs for application "
+ application.getAppId());
}
}
private static void checkState(ContainerState state) {
if (state == ContainerState.NEW || state == ContainerState.LOCALIZING ||
state == ContainerState.LOCALIZED) {
throw new NotFoundException("Container is not yet running. Current state is "
+ state);
}
if (state == ContainerState.LOCALIZATION_FAILED) {
throw new NotFoundException("Container wasn't started. Localization failed.");
}
}
public static FileInputStream openLogFileForRead(String containerIdStr, File logFile,
Context context) throws IOException {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
String user = context.getApplications().get(
applicationId).getUser();
try {
return SecureIOUtils.openForRead(logFile, user, null);
} catch (IOException e) {
if (e.getMessage().contains(
"did not match expected owner '" + user
+ "'")) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
throw new IOException("Exception reading log file. Application submitted by '"
+ user
+ "' doesn't own requested log file : "
+ logFile.getName(), e);
} else {
throw new IOException("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName(), e);
}
}
}
}

View File

@ -17,19 +17,31 @@
package org.apache.hadoop.yarn.server.nodemanager.webapp;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map.Entry;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -59,6 +71,9 @@ public class NMWebServices {
private static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private @javax.ws.rs.core.Context
HttpServletRequest request;
private @javax.ws.rs.core.Context
HttpServletResponse response;
@ -179,5 +194,66 @@ public class NMWebServices {
.toString(), webapp.name());
}
/**
* Returns the contents of a container's log file in plain text.
*
* Only works for containers that are still in the NodeManager's memory, so
* logs are no longer available after the corresponding application is no
* longer running.
*
* @param containerIdStr
* The container ID
* @param filename
* The name of the log file
* @return
* The contents of the container's log file
*/
@GET
@Path("/containerlogs/{containerid}/{filename}")
@Produces({ MediaType.TEXT_PLAIN })
@Public
@Unstable
public Response getLogs(@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename) {
ContainerId containerId;
try {
containerId = ConverterUtils.toContainerId(containerIdStr);
} catch (IllegalArgumentException ex) {
return Response.status(Status.BAD_REQUEST).build();
}
File logFile = null;
try {
logFile = ContainerLogsUtils.getContainerLogFile(
containerId, filename, request.getRemoteUser(), nmContext);
} catch (NotFoundException ex) {
return Response.status(Status.NOT_FOUND).entity(ex.getMessage()).build();
} catch (YarnException ex) {
return Response.serverError().entity(ex.getMessage()).build();
}
try {
final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
containerIdStr, logFile, nmContext);
StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream os) throws IOException,
WebApplicationException {
int bufferSize = 65536;
byte[] buf = new byte[bufferSize];
int len;
while ((len = fis.read(buf, 0, bufferSize)) > 0) {
os.write(buf, 0, len);
}
os.flush();
}
};
return Response.ok(stream).build();
} catch (IOException ex) {
return Response.serverError().entity(ex.getMessage()).build();
}
}
}

View File

@ -79,7 +79,7 @@ public class TestEventFlow {
YarnConfiguration conf = new YarnConfiguration();
Context context = new NMContext(new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM()) {
new NMTokenSecretManagerInNM(), null, null) {
@Override
public int getHttpPort() {
return 1234;

View File

@ -1185,7 +1185,7 @@ public class TestNodeStatusUpdater {
public MyNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
super(containerTokenSecretManager, nmTokenSecretManager);
super(containerTokenSecretManager, nmTokenSecretManager, null, null);
}
@Override

View File

@ -100,7 +100,7 @@ public abstract class BaseContainerManagerTest {
protected static final int HTTP_PORT = 5412;
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM()) {
conf), new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf)) {
public int getHttpPort() {
return HTTP_PORT;
};

View File

@ -490,6 +490,8 @@ public class TestApplication {
when(context.getContainerTokenSecretManager()).thenReturn(
new NMContainerTokenSecretManager(conf));
when(context.getApplicationACLsManager()).thenReturn(
new ApplicationACLsManager(conf));
// Setting master key
MasterKey masterKey = new MasterKeyPBImpl();
@ -501,8 +503,7 @@ public class TestApplication {
this.user = user;
this.appId = BuilderUtils.newApplicationId(timestamp, id);
app = new ApplicationImpl(dispatcher, new ApplicationACLsManager(
new Configuration()), this.user, appId, null, context);
app = new ApplicationImpl(dispatcher, this.user, appId, null, context);
containers = new ArrayList<Container>();
for (int i = 0; i < numContainers; i++) {
Container container = createMockedContainer(this.appId, i);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
@ -63,7 +63,7 @@ import com.google.inject.Module;
public class TestContainerLogsPage {
@Test(timeout=30000)
public void testContainerLogDirs() throws IOException {
public void testContainerLogDirs() throws IOException, YarnException {
File absLogDir = new File("target",
TestNMWebServer.class.getSimpleName() + "LogDir").getAbsoluteFile();
String logdirwithFile = absLogDir.toURI().toString();
@ -86,7 +86,7 @@ public class TestContainerLogsPage {
ContainerId container1 = BuilderUtils.newContainerId(recordFactory, appId,
appAttemptId, 0);
List<File> files = null;
files = ContainerLogsPage.ContainersLogsBlock.getContainerLogDirs(
files = ContainerLogsUtils.getContainerLogDirs(
container1, dirsHandler);
Assert.assertTrue(!(files.get(0).toString().contains("file:")));
}
@ -146,8 +146,6 @@ public class TestContainerLogsPage {
out.write("Log file Content".getBytes());
out.close();
ApplicationACLsManager aclsManager = mock(ApplicationACLsManager.class);
Context context = mock(Context.class);
ConcurrentMap<ApplicationId, Application> appMap =
new ConcurrentHashMap<ApplicationId, Application>();
@ -157,7 +155,7 @@ public class TestContainerLogsPage {
new ConcurrentHashMap<ContainerId, Container>());
ContainersLogsBlock cLogsBlock =
new ContainersLogsBlock(conf, context, aclsManager, dirsHandler);
new ContainersLogsBlock(context);
Map<String, String> params = new HashMap<String, String>();
params.put(YarnWebParams.CONTAINER_ID, container1.toString());

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -76,7 +77,7 @@ public class TestNMWebServer {
}
private int startNMWebAppServer(String webAddr) {
Context nmContext = new NodeManager.NMContext(null, null);
Context nmContext = new NodeManager.NMContext(null, null, null, null);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -133,8 +134,8 @@ public class TestNMWebServer {
}
@Test
public void testNMWebApp() throws IOException {
Context nmContext = new NodeManager.NMContext(null, null);
public void testNMWebApp() throws IOException, YarnException {
Context nmContext = new NodeManager.NMContext(null, null, null, null);
ResourceView resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -219,10 +220,10 @@ public class TestNMWebServer {
private void writeContainerLogs(Context nmContext,
ContainerId containerId, LocalDirsHandlerService dirsHandler)
throws IOException {
throws IOException, YarnException {
// ContainerLogDir should be created
File containerLogDir =
ContainerLogsPage.ContainersLogsBlock.getContainerLogDirs(containerId,
ContainerLogsUtils.getContainerLogDirs(containerId,
dirsHandler).get(0);
containerLogDir.mkdirs();
for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {

View File

@ -23,24 +23,38 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import javax.ws.rs.core.MediaType;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
@ -86,7 +100,14 @@ public class TestNMWebServices extends JerseyTest {
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
nmContext = new NodeManager.NMContext(null, null);
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
@ -110,13 +131,6 @@ public class TestNMWebServices extends JerseyTest {
return true;
}
};
Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
bind(JAXBContextResolver.class);
bind(NMWebServices.class);
@ -292,6 +306,53 @@ public class TestNMWebServices extends JerseyTest {
assertEquals("incorrect number of elements", 1, nodes.getLength());
verifyNodesXML(nodes);
}
@Test
public void testContainerLogs() throws IOException {
WebResource r = resource();
final ContainerId containerId = BuilderUtils.newContainerId(0, 0, 0, 0);
final String containerIdStr = BuilderUtils.newContainerId(0, 0, 0, 0)
.toString();
final ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
final ApplicationId appId = appAttemptId.getApplicationId();
final String appIdStr = appId.toString();
final String filename = "logfile1";
final String logMessage = "log message\n";
nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
appId, null, nmContext));
MockContainer container = new MockContainer(appAttemptId,
new AsyncDispatcher(), new Configuration(), "user", appId, 1);
container.setState(ContainerState.RUNNING);
nmContext.getContainers().put(containerId, container);
// write out log file
Path path = dirsHandler.getLogPathForWrite(
ContainerLaunch.getRelativeContainerLogDir(
appIdStr, containerIdStr) + "/" + filename, false);
File logFile = new File(path.toUri().getPath());
logFile.deleteOnExit();
assertTrue("Failed to create log dir", logFile.getParentFile().mkdirs());
PrintWriter pw = new PrintWriter(logFile);
pw.print(logMessage);
pw.close();
// ask for it
ClientResponse response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertEquals(logMessage, responseText);
// ask for file that doesn't exist
response = r.path("ws").path("v1").path("node")
.path("containerlogs").path(containerIdStr).path("uhhh")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Cannot find this log on the local disk."));
}
public void verifyNodesXML(NodeList nodes) throws JSONException, Exception {
for (int i = 0; i < nodes.getLength(); i++) {

View File

@ -93,7 +93,13 @@ public class TestNMWebServicesApps extends JerseyTest {
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
nmContext = new NodeManager.NMContext(null, null);
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
((NodeManager.NMContext)nmContext).setNodeId(nodeId);
resourceView = new ResourceView() {
@ -119,12 +125,6 @@ public class TestNMWebServicesApps extends JerseyTest {
return true;
}
};
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
bind(JAXBContextResolver.class);
bind(NMWebServices.class);

View File

@ -93,15 +93,6 @@ public class TestNMWebServicesContainers extends JerseyTest {
private Injector injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
nmContext = new NodeManager.NMContext(null, null) {
public NodeId getNodeId() {
return NodeId.newInstance("testhost.foo.com", 8042);
};
public int getHttpPort() {
return 1234;
};
};
resourceView = new ResourceView() {
@Override
public long getVmemAllocatedForContainers() {
@ -131,6 +122,15 @@ public class TestNMWebServicesContainers extends JerseyTest {
healthChecker.init(conf);
dirsHandler = healthChecker.getDiskHandler();
aclsManager = new ApplicationACLsManager(conf);
nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager) {
public NodeId getNodeId() {
return NodeId.newInstance("testhost.foo.com", 8042);
};
public int getHttpPort() {
return 1234;
};
};
nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
bind(JAXBContextResolver.class);
bind(NMWebServices.class);