YARN-10026. Pull out common code pieces from ATS v1.5 and v2. Contributed by Adam Antal
This commit is contained in:
parent
768ee22e9e
commit
dd2607e3ec
|
@ -32,25 +32,18 @@ import javax.ws.rs.QueryParam;
|
||||||
import javax.ws.rs.core.Context;
|
import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import javax.ws.rs.core.Response.ResponseBuilder;
|
|
||||||
import javax.ws.rs.core.Response.Status;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.sun.jersey.api.client.ClientHandlerException;
|
|
||||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.http.JettyUtils;
|
import org.apache.hadoop.http.JettyUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
import org.apache.hadoop.yarn.server.webapp.LogServlet;
|
||||||
import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
|
|
||||||
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
import org.apache.hadoop.yarn.server.webapp.WebServices;
|
||||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
|
||||||
|
@ -61,33 +54,20 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import org.codehaus.jettison.json.JSONException;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
@Path("/ws/v1/applicationhistory")
|
@Path("/ws/v1/applicationhistory")
|
||||||
public class AHSWebServices extends WebServices {
|
public class AHSWebServices extends WebServices {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory
|
private LogServlet logServlet;
|
||||||
.getLogger(AHSWebServices.class);
|
|
||||||
private static final String NM_DOWNLOAD_URI_STR =
|
|
||||||
"/ws/v1/node/containers";
|
|
||||||
private static final Joiner JOINER = Joiner.on("");
|
|
||||||
private static final Joiner DOT_JOINER = Joiner.on(". ");
|
|
||||||
private final Configuration conf;
|
|
||||||
private final LogAggregationFileControllerFactory factory;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public AHSWebServices(ApplicationBaseProtocol appBaseProt,
|
public AHSWebServices(ApplicationBaseProtocol appBaseProt,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
super(appBaseProt);
|
super(appBaseProt);
|
||||||
this.conf = conf;
|
this.logServlet = new LogServlet(conf, this);
|
||||||
this.factory = new LogAggregationFileControllerFactory(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@GET
|
@GET
|
||||||
|
@ -244,87 +224,9 @@ public class AHSWebServices extends WebServices {
|
||||||
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
|
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
|
||||||
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||||
@DefaultValue("false") boolean redirected_from_node) {
|
@DefaultValue("false") boolean redirected_from_node) {
|
||||||
ContainerId containerId = null;
|
|
||||||
initForReadableEndpoints(res);
|
initForReadableEndpoints(res);
|
||||||
try {
|
return logServlet.getContainerLogsInfo(req, containerIdStr, nmId,
|
||||||
containerId = ContainerId.fromString(containerIdStr);
|
redirected_from_node, null);
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
throw new BadRequestException("invalid container id, " + containerIdStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
|
||||||
.getApplicationId();
|
|
||||||
AppInfo appInfo;
|
|
||||||
try {
|
|
||||||
appInfo = super.getApp(req, res, appId.toString());
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// directly find logs from HDFS.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
// if the application finishes, directly find logs
|
|
||||||
// from HDFS.
|
|
||||||
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
|
||||||
String appOwner = appInfo.getUser();
|
|
||||||
String nodeHttpAddress = null;
|
|
||||||
if (nmId != null && !nmId.isEmpty()) {
|
|
||||||
try {
|
|
||||||
nodeHttpAddress = getNMWebAddressFromRM(conf, nmId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
LOG.debug("{}", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
|
||||||
ContainerInfo containerInfo;
|
|
||||||
try {
|
|
||||||
containerInfo = super.getContainer(
|
|
||||||
req, res, appId.toString(),
|
|
||||||
containerId.getApplicationAttemptId().toString(),
|
|
||||||
containerId.toString());
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// return log meta for the aggregated logs if exists.
|
|
||||||
// It will also return empty log meta for the local logs.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.getContainerLogMeta(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, true);
|
|
||||||
}
|
|
||||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
|
||||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
|
||||||
// we would only get log meta for aggregated logs instead of
|
|
||||||
// re-directing the request
|
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
|
||||||
|| redirected_from_node) {
|
|
||||||
// return log meta for the aggregated logs if exists.
|
|
||||||
// It will also return empty log meta for the local logs.
|
|
||||||
// If this is the redirect request from NM, we should not
|
|
||||||
// re-direct the request back. Simply output the aggregated log meta.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.getContainerLogMeta(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
String uri = "/" + containerId.toString() + "/logs";
|
|
||||||
String resURI = JOINER.join(
|
|
||||||
LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress),
|
|
||||||
NM_DOWNLOAD_URI_STR, uri);
|
|
||||||
String query = req.getQueryString();
|
|
||||||
if (query != null && !query.isEmpty()) {
|
|
||||||
resURI += "?" + query;
|
|
||||||
}
|
|
||||||
ResponseBuilder response = Response.status(
|
|
||||||
HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
|
||||||
response.header("Location", resURI);
|
|
||||||
return response.build();
|
|
||||||
} else {
|
|
||||||
throw new NotFoundException(
|
|
||||||
"The application is not at Running or Finished State.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -385,93 +287,19 @@ public class AHSWebServices extends WebServices {
|
||||||
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||||
@DefaultValue("false") boolean redirected_from_node) {
|
@DefaultValue("false") boolean redirected_from_node) {
|
||||||
initForReadableEndpoints(res);
|
initForReadableEndpoints(res);
|
||||||
ContainerId containerId;
|
return logServlet.getLogFile(req, containerIdStr, filename, format, size,
|
||||||
try {
|
nmId, redirected_from_node, null);
|
||||||
containerId = ContainerId.fromString(containerIdStr);
|
|
||||||
} catch (IllegalArgumentException ex) {
|
|
||||||
return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
|
|
||||||
"Invalid ContainerId: " + containerIdStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final long length = LogWebServiceUtils.parseLongParam(size);
|
@VisibleForTesting
|
||||||
|
@InterfaceAudience.Private
|
||||||
ApplicationId appId = containerId.getApplicationAttemptId()
|
LogServlet getLogServlet() {
|
||||||
.getApplicationId();
|
return this.logServlet;
|
||||||
AppInfo appInfo;
|
|
||||||
try {
|
|
||||||
appInfo = super.getApp(req, res, appId.toString());
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// directly find logs from HDFS.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
|
|
||||||
filename, format, length, false);
|
|
||||||
}
|
|
||||||
String appOwner = appInfo.getUser();
|
|
||||||
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
|
||||||
// directly find logs from HDFS.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, filename, format, length, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
@VisibleForTesting
|
||||||
String nodeHttpAddress = null;
|
@InterfaceAudience.Private
|
||||||
if (nmId != null && !nmId.isEmpty()) {
|
void setLogServlet(LogServlet logServlet) {
|
||||||
try {
|
this.logServlet = logServlet;
|
||||||
nodeHttpAddress = getNMWebAddressFromRM(conf, nmId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
LOG.debug("{}", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
|
||||||
ContainerInfo containerInfo;
|
|
||||||
try {
|
|
||||||
containerInfo = super.getContainer(
|
|
||||||
req, res, appId.toString(),
|
|
||||||
containerId.getApplicationAttemptId().toString(),
|
|
||||||
containerId.toString());
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// output the aggregated logs
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, filename, format, length, true);
|
|
||||||
}
|
|
||||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
|
||||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
|
||||||
// we would only get aggregated logs instead of re-directing the
|
|
||||||
// request.
|
|
||||||
// If this is the redirect request from NM, we should not re-direct the
|
|
||||||
// request back. Simply output the aggregated logs.
|
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
|
||||||
|| redirected_from_node) {
|
|
||||||
// output the aggregated logs
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, filename, format, length, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
String uri = "/" + containerId.toString() + "/logs/" + filename;
|
|
||||||
String resURI = JOINER.join(
|
|
||||||
LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress),
|
|
||||||
NM_DOWNLOAD_URI_STR, uri);
|
|
||||||
String query = req.getQueryString();
|
|
||||||
if (query != null && !query.isEmpty()) {
|
|
||||||
resURI += "?" + query;
|
|
||||||
}
|
|
||||||
ResponseBuilder response = Response.status(
|
|
||||||
HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
|
||||||
response.header("Location", resURI);
|
|
||||||
return response.build();
|
|
||||||
} else {
|
|
||||||
return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
|
|
||||||
"The application is not at Running or Finished State.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting @InterfaceAudience.Private
|
|
||||||
public String getNMWebAddressFromRM(Configuration configuration,
|
|
||||||
String nodeId)
|
|
||||||
throws ClientHandlerException, UniformInterfaceException, JSONException {
|
|
||||||
return LogWebServiceUtils.getNMWebAddressFromRM(configuration, nodeId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -24,6 +24,9 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -64,6 +67,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.webapp.LogServlet;
|
||||||
import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
|
import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
|
||||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||||
|
@ -88,7 +92,6 @@ import org.junit.runners.Parameterized;
|
||||||
import com.google.inject.Guice;
|
import com.google.inject.Guice;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import com.google.inject.servlet.ServletModule;
|
import com.google.inject.servlet.ServletModule;
|
||||||
import com.sun.jersey.api.client.ClientHandlerException;
|
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
import com.sun.jersey.api.client.ClientResponse.Status;
|
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||||
import com.sun.jersey.api.client.GenericType;
|
import com.sun.jersey.api.client.GenericType;
|
||||||
|
@ -137,17 +140,13 @@ public class TestAHSWebServices extends JerseyTestBase {
|
||||||
};
|
};
|
||||||
historyClientService.init(conf);
|
historyClientService.init(conf);
|
||||||
historyClientService.start();
|
historyClientService.start();
|
||||||
ahsWebservice = new AHSWebServices(historyClientService, conf) {
|
|
||||||
@Override
|
ahsWebservice = new AHSWebServices(historyClientService, conf);
|
||||||
public String getNMWebAddressFromRM(Configuration configuration,
|
LogServlet logServlet = spy(ahsWebservice.getLogServlet());
|
||||||
String nodeId) throws ClientHandlerException,
|
doReturn(null).when(logServlet).getNMWebAddressFromRM(any());
|
||||||
UniformInterfaceException, JSONException {
|
doReturn(NM_WEBADDRESS).when(logServlet).getNMWebAddressFromRM(NM_ID);
|
||||||
if (nodeId.equals(NM_ID)) {
|
ahsWebservice.setLogServlet(logServlet);
|
||||||
return NM_WEBADDRESS;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
fs = FileSystem.get(conf);
|
fs = FileSystem.get(conf);
|
||||||
GuiceServletConfig.setInjector(
|
GuiceServletConfig.setInjector(
|
||||||
Guice.createInjector(new WebServletModule()));
|
Guice.createInjector(new WebServletModule()));
|
||||||
|
@ -171,7 +170,7 @@ public class TestAHSWebServices extends JerseyTestBase {
|
||||||
@Override
|
@Override
|
||||||
protected void configureServlets() {
|
protected void configureServlets() {
|
||||||
bind(JAXBContextResolver.class);
|
bind(JAXBContextResolver.class);
|
||||||
bind(AHSWebServices.class).toInstance(ahsWebservice);;
|
bind(AHSWebServices.class).toInstance(ahsWebservice);
|
||||||
bind(GenericExceptionHandler.class);
|
bind(GenericExceptionHandler.class);
|
||||||
bind(ApplicationBaseProtocol.class).toInstance(historyClientService);
|
bind(ApplicationBaseProtocol.class).toInstance(historyClientService);
|
||||||
serve("/*").with(GuiceContainer.class);
|
serve("/*").with(GuiceContainer.class);
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.webapp;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Classes implementing this interface are used in the {@link LogServlet}
|
||||||
|
* for providing various application related information.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate({"YARN"})
|
||||||
|
public interface AppInfoProvider {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the node HTTP address.
|
||||||
|
*
|
||||||
|
* @param req {@link HttpServletRequest} associated with the request
|
||||||
|
* @param appId the id of the application
|
||||||
|
* @param appAttemptId the id of the application attempt
|
||||||
|
* @param containerId the container id
|
||||||
|
* @param clusterId the id of the cluster
|
||||||
|
* @return the node HTTP address
|
||||||
|
*/
|
||||||
|
String getNodeHttpAddress(HttpServletRequest req,
|
||||||
|
String appId, String appAttemptId, String containerId, String clusterId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@link BasicAppInfo} object that wraps the collected information
|
||||||
|
* about the application.
|
||||||
|
*
|
||||||
|
* @param req {@link HttpServletRequest} associated with the request
|
||||||
|
* @param appId the id of the application
|
||||||
|
* @param clusterId the id of the cluster
|
||||||
|
* @return {@link BasicAppInfo} object
|
||||||
|
*/
|
||||||
|
BasicAppInfo getApp(HttpServletRequest req, String appId, String clusterId);
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.webapp;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
|
import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class that wraps application information
|
||||||
|
* required by the {@link LogServlet} class.
|
||||||
|
*/
|
||||||
|
class BasicAppInfo {
|
||||||
|
private final YarnApplicationState appState;
|
||||||
|
private final String user;
|
||||||
|
|
||||||
|
BasicAppInfo(YarnApplicationState appState, String user) {
|
||||||
|
this.appState = appState;
|
||||||
|
this.user = user;
|
||||||
|
}
|
||||||
|
|
||||||
|
static BasicAppInfo fromAppInfo(AppInfo report) {
|
||||||
|
return new BasicAppInfo(report.getAppState(), report.getUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
YarnApplicationState getAppState() {
|
||||||
|
return this.appState;
|
||||||
|
}
|
||||||
|
|
||||||
|
String getUser() {
|
||||||
|
return this.user;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,260 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.webapp;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.sun.jersey.api.client.ClientHandlerException;
|
||||||
|
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||||
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
|
import org.codehaus.jettison.json.JSONException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import javax.ws.rs.core.Response;
|
||||||
|
import javax.ws.rs.core.Response.Status;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts aggregated logs and related information.
|
||||||
|
* Used by various WebServices (AHS, ATS).
|
||||||
|
*/
|
||||||
|
public class LogServlet extends Configured {
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(LogServlet.class);
|
||||||
|
|
||||||
|
private static final Joiner JOINER = Joiner.on("");
|
||||||
|
private static final String NM_DOWNLOAD_URI_STR = "/ws/v1/node/containers";
|
||||||
|
|
||||||
|
private final LogAggregationFileControllerFactory factory;
|
||||||
|
private final AppInfoProvider appInfoProvider;
|
||||||
|
|
||||||
|
public LogServlet(Configuration conf, AppInfoProvider appInfoProvider) {
|
||||||
|
super(conf);
|
||||||
|
this.factory = new LogAggregationFileControllerFactory(conf);
|
||||||
|
this.appInfoProvider = appInfoProvider;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public String getNMWebAddressFromRM(String nodeId)
|
||||||
|
throws ClientHandlerException, UniformInterfaceException, JSONException {
|
||||||
|
return LogWebServiceUtils.getNMWebAddressFromRM(getConf(), nodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns information about the logs for a specific container.
|
||||||
|
*
|
||||||
|
* @param req the {@link HttpServletRequest}
|
||||||
|
* @param containerIdStr container id
|
||||||
|
* @param nmId NodeManager id
|
||||||
|
* @param redirectedFromNode whether the request was redirected
|
||||||
|
* @param clusterId the id of the cluster
|
||||||
|
* @return {@link Response} object containing information about the logs
|
||||||
|
*/
|
||||||
|
public Response getContainerLogsInfo(HttpServletRequest req,
|
||||||
|
String containerIdStr, String nmId, boolean redirectedFromNode,
|
||||||
|
String clusterId) {
|
||||||
|
ContainerId containerId = null;
|
||||||
|
try {
|
||||||
|
containerId = ContainerId.fromString(containerIdStr);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new BadRequestException("invalid container id, " + containerIdStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||||
|
.getApplicationId();
|
||||||
|
BasicAppInfo appInfo;
|
||||||
|
try {
|
||||||
|
appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// directly find logs from HDFS.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
// if the application finishes, directly find logs
|
||||||
|
// from HDFS.
|
||||||
|
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||||
|
String appOwner = appInfo.getUser();
|
||||||
|
String nodeHttpAddress = null;
|
||||||
|
if (nmId != null && !nmId.isEmpty()) {
|
||||||
|
try {
|
||||||
|
nodeHttpAddress = getNMWebAddressFromRM(nmId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.info("Exception during getting NM web address.", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
||||||
|
try {
|
||||||
|
nodeHttpAddress = appInfoProvider.getNodeHttpAddress(
|
||||||
|
req, appId.toString(),
|
||||||
|
containerId.getApplicationAttemptId().toString(),
|
||||||
|
containerId.toString(), clusterId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// return log meta for the aggregated logs if exists.
|
||||||
|
// It will also return empty log meta for the local logs.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, true);
|
||||||
|
}
|
||||||
|
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||||
|
// we would only get log meta for aggregated logs instead of
|
||||||
|
// re-directing the request
|
||||||
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
||||||
|
|| redirectedFromNode) {
|
||||||
|
// return log meta for the aggregated logs if exists.
|
||||||
|
// It will also return empty log meta for the local logs.
|
||||||
|
// If this is the redirect request from NM, we should not
|
||||||
|
// re-direct the request back. Simply output the aggregated log meta.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.getContainerLogMeta(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String uri = "/" + containerId.toString() + "/logs";
|
||||||
|
String resURI = JOINER.join(
|
||||||
|
LogWebServiceUtils.getAbsoluteNMWebAddress(getConf(),
|
||||||
|
nodeHttpAddress),
|
||||||
|
NM_DOWNLOAD_URI_STR, uri);
|
||||||
|
String query = req.getQueryString();
|
||||||
|
if (query != null && !query.isEmpty()) {
|
||||||
|
resURI += "?" + query;
|
||||||
|
}
|
||||||
|
Response.ResponseBuilder response = Response.status(
|
||||||
|
HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||||
|
response.header("Location", resURI);
|
||||||
|
return response.build();
|
||||||
|
} else {
|
||||||
|
throw new NotFoundException(
|
||||||
|
"The application is not at Running or Finished State.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an aggregated log file belonging to a container.
|
||||||
|
*
|
||||||
|
* @param req the {@link HttpServletRequest}
|
||||||
|
* @param containerIdStr container id
|
||||||
|
* @param filename the name of the file
|
||||||
|
* @param format the format of the response
|
||||||
|
* @param size the size of bytes of the log file that should be returned
|
||||||
|
* @param nmId NodeManager id
|
||||||
|
* @param redirectedFromNode whether the request was redirected
|
||||||
|
* @param clusterId the id of the cluster
|
||||||
|
* @return {@link Response} object containing information about the logs
|
||||||
|
*/
|
||||||
|
public Response getLogFile(HttpServletRequest req, String containerIdStr,
|
||||||
|
String filename, String format, String size, String nmId,
|
||||||
|
boolean redirectedFromNode, String clusterId) {
|
||||||
|
ContainerId containerId;
|
||||||
|
try {
|
||||||
|
containerId = ContainerId.fromString(containerIdStr);
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
|
||||||
|
"Invalid ContainerId: " + containerIdStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
final long length = LogWebServiceUtils.parseLongParam(size);
|
||||||
|
|
||||||
|
ApplicationId appId = containerId.getApplicationAttemptId()
|
||||||
|
.getApplicationId();
|
||||||
|
BasicAppInfo appInfo;
|
||||||
|
try {
|
||||||
|
appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// directly find logs from HDFS.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
|
||||||
|
filename, format, length, false);
|
||||||
|
}
|
||||||
|
String appOwner = appInfo.getUser();
|
||||||
|
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
||||||
|
// directly find logs from HDFS.
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
||||||
|
String nodeHttpAddress = null;
|
||||||
|
if (nmId != null && !nmId.isEmpty()) {
|
||||||
|
try {
|
||||||
|
nodeHttpAddress = getNMWebAddressFromRM(nmId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.debug("Exception happened during obtaining NM web address " +
|
||||||
|
"from RM.", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
||||||
|
try {
|
||||||
|
nodeHttpAddress = appInfoProvider.getNodeHttpAddress(
|
||||||
|
req, appId.toString(),
|
||||||
|
containerId.getApplicationAttemptId().toString(),
|
||||||
|
containerId.toString(), clusterId);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// output the aggregated logs
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, true);
|
||||||
|
}
|
||||||
|
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
||||||
|
// we would only get aggregated logs instead of re-directing the
|
||||||
|
// request.
|
||||||
|
// If this is the redirect request from NM, we should not re-direct the
|
||||||
|
// request back. Simply output the aggregated logs.
|
||||||
|
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
||||||
|
|| redirectedFromNode) {
|
||||||
|
// output the aggregated logs
|
||||||
|
return LogWebServiceUtils
|
||||||
|
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
||||||
|
containerIdStr, filename, format, length, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String uri = "/" + containerId.toString() + "/logs/" + filename;
|
||||||
|
String resURI = JOINER.join(
|
||||||
|
LogWebServiceUtils.getAbsoluteNMWebAddress(getConf(),
|
||||||
|
nodeHttpAddress),
|
||||||
|
NM_DOWNLOAD_URI_STR, uri);
|
||||||
|
String query = req.getQueryString();
|
||||||
|
if (query != null && !query.isEmpty()) {
|
||||||
|
resURI += "?" + query;
|
||||||
|
}
|
||||||
|
Response.ResponseBuilder response = Response.status(
|
||||||
|
HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||||
|
response.header("Location", resURI);
|
||||||
|
return response.build();
|
||||||
|
} else {
|
||||||
|
return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
|
||||||
|
"The application is not at Running or Finished State.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,8 +35,6 @@ import org.apache.hadoop.http.JettyUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||||
|
@ -44,8 +42,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
|
||||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
|
||||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -71,7 +67,9 @@ import java.security.PrivilegedExceptionAction;
|
||||||
/**
|
/**
|
||||||
* Support only ATSv2 client only.
|
* Support only ATSv2 client only.
|
||||||
*/
|
*/
|
||||||
@Singleton @Path("/ws/v2/applicationlog") public class LogWebService {
|
@Singleton
|
||||||
|
@Path("/ws/v2/applicationlog")
|
||||||
|
public class LogWebService implements AppInfoProvider {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(LogWebService.class);
|
LoggerFactory.getLogger(LogWebService.class);
|
||||||
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
|
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
|
||||||
|
@ -81,6 +79,8 @@ import java.security.PrivilegedExceptionAction;
|
||||||
private static LogAggregationFileControllerFactory factory;
|
private static LogAggregationFileControllerFactory factory;
|
||||||
private static String base;
|
private static String base;
|
||||||
private static String defaultClusterid;
|
private static String defaultClusterid;
|
||||||
|
|
||||||
|
private final LogServlet logServlet;
|
||||||
private volatile Client webTimelineClient;
|
private volatile Client webTimelineClient;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -99,6 +99,10 @@ import java.security.PrivilegedExceptionAction;
|
||||||
+ " for URI: " + base);
|
+ " for URI: " + base);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public LogWebService() {
|
||||||
|
this.logServlet = new LogServlet(yarnConf, this);
|
||||||
|
}
|
||||||
|
|
||||||
private Client createTimelineWebClient() {
|
private Client createTimelineWebClient() {
|
||||||
ClientConfig cfg = new DefaultClientConfig();
|
ClientConfig cfg = new DefaultClientConfig();
|
||||||
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
||||||
|
@ -136,7 +140,8 @@ import java.security.PrivilegedExceptionAction;
|
||||||
* @param redirectedFromNode Whether this is a redirected request from NM
|
* @param redirectedFromNode Whether this is a redirected request from NM
|
||||||
* @return The log file's name and current file size
|
* @return The log file's name and current file size
|
||||||
*/
|
*/
|
||||||
@GET @Path("/containers/{containerid}/logs")
|
@GET
|
||||||
|
@Path("/containers/{containerid}/logs")
|
||||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||||
public Response getContainerLogsInfo(@Context HttpServletRequest req,
|
public Response getContainerLogsInfo(@Context HttpServletRequest req,
|
||||||
@Context HttpServletResponse res,
|
@Context HttpServletResponse res,
|
||||||
|
@ -145,91 +150,14 @@ import java.security.PrivilegedExceptionAction;
|
||||||
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
|
||||||
@DefaultValue("false") boolean redirectedFromNode,
|
@DefaultValue("false") boolean redirectedFromNode,
|
||||||
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
||||||
ContainerId containerId = null;
|
|
||||||
initForReadableEndpoints(res);
|
initForReadableEndpoints(res);
|
||||||
try {
|
return logServlet.getContainerLogsInfo(req, containerIdStr, nmId,
|
||||||
containerId = ContainerId.fromString(containerIdStr);
|
redirectedFromNode, clusterId);
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
throw new BadRequestException("invalid container id, " + containerIdStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ApplicationId appId =
|
@Override
|
||||||
containerId.getApplicationAttemptId().getApplicationId();
|
public String getNodeHttpAddress(HttpServletRequest req, String appId,
|
||||||
AppInfo appInfo;
|
String appAttemptId, String containerId, String clusterId) {
|
||||||
try {
|
|
||||||
appInfo = getApp(req, appId.toString(), clusterId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// directly find logs from HDFS.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
// if the application finishes, directly find logs
|
|
||||||
// from HDFS.
|
|
||||||
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
|
||||||
String appOwner = appInfo.getUser();
|
|
||||||
String nodeHttpAddress = null;
|
|
||||||
if (nmId != null && !nmId.isEmpty()) {
|
|
||||||
try {
|
|
||||||
nodeHttpAddress =
|
|
||||||
LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
LOG.debug("{}", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
|
||||||
ContainerInfo containerInfo;
|
|
||||||
try {
|
|
||||||
containerInfo =
|
|
||||||
getContainer(req, appId.toString(), containerId.toString(),
|
|
||||||
clusterId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// return log meta for the aggregated logs if exists.
|
|
||||||
// It will also return empty log meta for the local logs.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.getContainerLogMeta(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, true);
|
|
||||||
}
|
|
||||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
|
||||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
|
||||||
// we would only get log meta for aggregated logs instead of
|
|
||||||
// re-directing the request
|
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
|
||||||
|| redirectedFromNode) {
|
|
||||||
// return log meta for the aggregated logs if exists.
|
|
||||||
// It will also return empty log meta for the local logs.
|
|
||||||
// If this is the redirect request from NM, we should not
|
|
||||||
// re-direct the request back. Simply output the aggregated log meta.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.getContainerLogMeta(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
String uri = "/" + containerId.toString() + "/logs";
|
|
||||||
String resURI = JOINER.join(
|
|
||||||
LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress),
|
|
||||||
NM_DOWNLOAD_URI_STR, uri);
|
|
||||||
String query = req.getQueryString();
|
|
||||||
if (query != null && !query.isEmpty()) {
|
|
||||||
resURI += "?" + query;
|
|
||||||
}
|
|
||||||
Response.ResponseBuilder response =
|
|
||||||
Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
|
||||||
response.header("Location", resURI);
|
|
||||||
return response.build();
|
|
||||||
} else {
|
|
||||||
throw new NotFoundException(
|
|
||||||
"The application is not at Running or Finished State.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ContainerInfo getContainer(HttpServletRequest req, String appId,
|
|
||||||
String containerId, String clusterId) {
|
|
||||||
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
|
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
|
||||||
String cId = clusterId != null ? clusterId : defaultClusterid;
|
String cId = clusterId != null ? clusterId : defaultClusterid;
|
||||||
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
||||||
|
@ -255,14 +183,12 @@ import java.security.PrivilegedExceptionAction;
|
||||||
if (conEntity == null) {
|
if (conEntity == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
String nodeHttpAddress = (String) conEntity.getInfo()
|
return (String) conEntity.getInfo()
|
||||||
.get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
|
.get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
|
||||||
|
|
||||||
ContainerInfo info = new ContainerInfo(nodeHttpAddress);
|
|
||||||
return info;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AppInfo getApp(HttpServletRequest req, String appId,
|
@Override
|
||||||
|
public BasicAppInfo getApp(HttpServletRequest req, String appId,
|
||||||
String clusterId) {
|
String clusterId) {
|
||||||
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
|
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
|
||||||
|
|
||||||
|
@ -296,8 +222,7 @@ import java.security.PrivilegedExceptionAction;
|
||||||
String state = (String) appEntity.getInfo()
|
String state = (String) appEntity.getInfo()
|
||||||
.get(ApplicationMetricsConstants.STATE_EVENT_INFO);
|
.get(ApplicationMetricsConstants.STATE_EVENT_INFO);
|
||||||
YarnApplicationState appState = YarnApplicationState.valueOf(state);
|
YarnApplicationState appState = YarnApplicationState.valueOf(state);
|
||||||
AppInfo info = new AppInfo(appState, appOwner);
|
return new BasicAppInfo(appState, appOwner);
|
||||||
return info;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -313,9 +238,12 @@ import java.security.PrivilegedExceptionAction;
|
||||||
* @param redirectedFromNode Whether this is the redirect request from NM
|
* @param redirectedFromNode Whether this is the redirect request from NM
|
||||||
* @return The contents of the container's log file
|
* @return The contents of the container's log file
|
||||||
*/
|
*/
|
||||||
@GET @Path("/containers/{containerid}/logs/{filename}")
|
@GET
|
||||||
@Produces({ MediaType.TEXT_PLAIN }) @InterfaceAudience.Public
|
@Path("/containers/{containerid}/logs/{filename}")
|
||||||
@InterfaceStability.Unstable public Response getContainerLogFile(
|
@Produces({ MediaType.TEXT_PLAIN })
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public Response getContainerLogFile(
|
||||||
@Context HttpServletRequest req, @Context HttpServletResponse res,
|
@Context HttpServletRequest req, @Context HttpServletResponse res,
|
||||||
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||||
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
|
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
|
||||||
|
@ -332,9 +260,11 @@ import java.security.PrivilegedExceptionAction;
|
||||||
//TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
|
//TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
|
||||||
// container log webservice introduced in AHS to minimize
|
// container log webservice introduced in AHS to minimize
|
||||||
// the duplication.
|
// the duplication.
|
||||||
@GET @Path("/containerlogs/{containerid}/{filename}")
|
@GET
|
||||||
|
@Path("/containerlogs/{containerid}/{filename}")
|
||||||
@Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
|
@Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
|
||||||
@InterfaceAudience.Public @InterfaceStability.Unstable
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Unstable
|
||||||
public Response getLogs(@Context HttpServletRequest req,
|
public Response getLogs(@Context HttpServletRequest req,
|
||||||
@Context HttpServletResponse res,
|
@Context HttpServletResponse res,
|
||||||
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||||
|
@ -346,117 +276,8 @@ import java.security.PrivilegedExceptionAction;
|
||||||
@DefaultValue("false") boolean redirectedFromNode,
|
@DefaultValue("false") boolean redirectedFromNode,
|
||||||
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
|
||||||
initForReadableEndpoints(res);
|
initForReadableEndpoints(res);
|
||||||
ContainerId containerId;
|
return logServlet.getLogFile(req, containerIdStr, filename, format, size,
|
||||||
try {
|
nmId, redirectedFromNode, clusterId);
|
||||||
containerId = ContainerId.fromString(containerIdStr);
|
|
||||||
} catch (IllegalArgumentException ex) {
|
|
||||||
return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND,
|
|
||||||
"Invalid ContainerId: " + containerIdStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
final long length = LogWebServiceUtils.parseLongParam(size);
|
|
||||||
|
|
||||||
ApplicationId appId =
|
|
||||||
containerId.getApplicationAttemptId().getApplicationId();
|
|
||||||
AppInfo appInfo;
|
|
||||||
try {
|
|
||||||
appInfo = getApp(req, appId.toString(), clusterId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// directly find logs from HDFS.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
|
|
||||||
filename, format, length, false);
|
|
||||||
}
|
|
||||||
String appOwner = appInfo.getUser();
|
|
||||||
if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
|
|
||||||
// directly find logs from HDFS.
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, filename, format, length, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
|
|
||||||
String nodeHttpAddress = null;
|
|
||||||
if (nmId != null && !nmId.isEmpty()) {
|
|
||||||
try {
|
|
||||||
nodeHttpAddress =
|
|
||||||
LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
LOG.debug("{}", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
|
|
||||||
ContainerInfo containerInfo;
|
|
||||||
try {
|
|
||||||
containerInfo =
|
|
||||||
getContainer(req, appId.toString(), containerId.toString(),
|
|
||||||
clusterId);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
// output the aggregated logs
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, filename, format, length, true);
|
|
||||||
}
|
|
||||||
nodeHttpAddress = containerInfo.getNodeHttpAddress();
|
|
||||||
// make sure nodeHttpAddress is not null and not empty. Otherwise,
|
|
||||||
// we would only get aggregated logs instead of re-directing the
|
|
||||||
// request.
|
|
||||||
// If this is the redirect request from NM, we should not re-direct the
|
|
||||||
// request back. Simply output the aggregated logs.
|
|
||||||
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|
|
||||||
|| redirectedFromNode) {
|
|
||||||
// output the aggregated logs
|
|
||||||
return LogWebServiceUtils
|
|
||||||
.sendStreamOutputResponse(factory, appId, appOwner, null,
|
|
||||||
containerIdStr, filename, format, length, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
String uri = "/" + containerId.toString() + "/logs/" + filename;
|
|
||||||
String resURI = JOINER.join(
|
|
||||||
LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress),
|
|
||||||
NM_DOWNLOAD_URI_STR, uri);
|
|
||||||
String query = req.getQueryString();
|
|
||||||
if (query != null && !query.isEmpty()) {
|
|
||||||
resURI += "?" + query;
|
|
||||||
}
|
|
||||||
Response.ResponseBuilder response =
|
|
||||||
Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
|
||||||
response.header("Location", resURI);
|
|
||||||
return response.build();
|
|
||||||
} else {
|
|
||||||
return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND,
|
|
||||||
"The application is not at Running or Finished State.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static class AppInfo {
|
|
||||||
private YarnApplicationState appState;
|
|
||||||
private String user;
|
|
||||||
|
|
||||||
AppInfo(YarnApplicationState appState, String user) {
|
|
||||||
this.appState = appState;
|
|
||||||
this.user = user;
|
|
||||||
}
|
|
||||||
|
|
||||||
public YarnApplicationState getAppState() {
|
|
||||||
return this.appState;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getUser() {
|
|
||||||
return this.user;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static class ContainerInfo {
|
|
||||||
private String nodeHttpAddress;
|
|
||||||
|
|
||||||
ContainerInfo(String nodeHttpAddress) {
|
|
||||||
this.nodeHttpAddress = nodeHttpAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getNodeHttpAddress() {
|
|
||||||
return nodeHttpAddress;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting protected TimelineEntity getEntity(String path,
|
@VisibleForTesting protected TimelineEntity getEntity(String path,
|
||||||
|
|
|
@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
|
|
||||||
public class WebServices {
|
public class WebServices implements AppInfoProvider {
|
||||||
|
|
||||||
protected ApplicationBaseProtocol appBaseProt;
|
protected ApplicationBaseProtocol appBaseProt;
|
||||||
|
|
||||||
|
@ -219,8 +219,18 @@ public class WebServices {
|
||||||
return allApps;
|
return allApps;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AppInfo getApp(HttpServletRequest req, HttpServletResponse res,
|
public AppInfo getApp(HttpServletRequest req,
|
||||||
String appId) {
|
HttpServletResponse res, String appId) {
|
||||||
|
return getApp(req, appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BasicAppInfo getApp(HttpServletRequest req, String appId,
|
||||||
|
String clusterId) {
|
||||||
|
return BasicAppInfo.fromAppInfo(getApp(req, appId));
|
||||||
|
}
|
||||||
|
|
||||||
|
public AppInfo getApp(HttpServletRequest req, String appId) {
|
||||||
UserGroupInformation callerUGI = getUser(req);
|
UserGroupInformation callerUGI = getUser(req);
|
||||||
final ApplicationId id = parseApplicationId(appId);
|
final ApplicationId id = parseApplicationId(appId);
|
||||||
ApplicationReport app = null;
|
ApplicationReport app = null;
|
||||||
|
@ -356,8 +366,17 @@ public class WebServices {
|
||||||
return containersInfo;
|
return containersInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getNodeHttpAddress(HttpServletRequest req,
|
||||||
|
String appId, String appAttemptId,
|
||||||
|
String containerId, String clusterId) {
|
||||||
|
ContainerInfo containerInfo = getContainer(req, appId,
|
||||||
|
appAttemptId, containerId);
|
||||||
|
return containerInfo.getNodeHttpAddress();
|
||||||
|
}
|
||||||
|
|
||||||
public ContainerInfo getContainer(HttpServletRequest req,
|
public ContainerInfo getContainer(HttpServletRequest req,
|
||||||
HttpServletResponse res, String appId, String appAttemptId,
|
String appId, String appAttemptId,
|
||||||
String containerId) {
|
String containerId) {
|
||||||
UserGroupInformation callerUGI = getUser(req);
|
UserGroupInformation callerUGI = getUser(req);
|
||||||
ApplicationId aid = parseApplicationId(appId);
|
ApplicationId aid = parseApplicationId(appId);
|
||||||
|
@ -392,6 +411,12 @@ public class WebServices {
|
||||||
return new ContainerInfo(container);
|
return new ContainerInfo(container);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ContainerInfo getContainer(HttpServletRequest req,
|
||||||
|
HttpServletResponse res, String appId, String appAttemptId,
|
||||||
|
String containerId) {
|
||||||
|
return getContainer(req, appId, appAttemptId, containerId);
|
||||||
|
}
|
||||||
|
|
||||||
protected void initForReadableEndpoints(HttpServletResponse response) {
|
protected void initForReadableEndpoints(HttpServletResponse response) {
|
||||||
// clear content type
|
// clear content type
|
||||||
response.setContentType(null);
|
response.setContentType(null);
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.webapp;
|
|
@ -42,14 +42,14 @@ public class TestLogWebService {
|
||||||
|
|
||||||
private HttpServletRequest request;
|
private HttpServletRequest request;
|
||||||
private LogWebServiceTest logWebService;
|
private LogWebServiceTest logWebService;
|
||||||
private static TimelineEntity entity;
|
|
||||||
private ApplicationId appId;
|
private ApplicationId appId;
|
||||||
private ContainerId cId;
|
private ContainerId cId;
|
||||||
private String user = "user1";
|
private String user = "user1";
|
||||||
private Map<String, TimelineEntity> entities;
|
private Map<String, TimelineEntity> entities;
|
||||||
private String nodeHttpAddress = "localhost:0";
|
private String nodeHttpAddress = "localhost:0";
|
||||||
|
|
||||||
@Before public void setup() throws Exception {
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
appId = ApplicationId.fromString("application_1518143905142_509690");
|
appId = ApplicationId.fromString("application_1518143905142_509690");
|
||||||
cId =
|
cId =
|
||||||
ContainerId.fromString("container_e138_1518143905142_509690_01_000001");
|
ContainerId.fromString("container_e138_1518143905142_509690_01_000001");
|
||||||
|
@ -62,23 +62,26 @@ public class TestLogWebService {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void testGetApp() {
|
@Test
|
||||||
|
public void testGetApp() {
|
||||||
LogWebService.AppInfo app =
|
BasicAppInfo app =
|
||||||
logWebService.getApp(request, appId.toString(), null);
|
logWebService.getApp(request, appId.toString(), null);
|
||||||
Assert.assertEquals("RUNNING", app.getAppState().toString());
|
Assert.assertEquals("RUNNING", app.getAppState().toString());
|
||||||
Assert.assertEquals(user, app.getUser());
|
Assert.assertEquals(user, app.getUser());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void testGetContainer() {
|
@Test
|
||||||
LogWebService.ContainerInfo container = logWebService
|
public void testGetContainer() {
|
||||||
.getContainer(request, appId.toString(), cId.toString(), null);
|
String address = logWebService
|
||||||
Assert.assertEquals(nodeHttpAddress, container.getNodeHttpAddress());
|
.getNodeHttpAddress(request, appId.toString(), null, cId.toString(),
|
||||||
|
null);
|
||||||
|
Assert.assertEquals(this.nodeHttpAddress, address);
|
||||||
}
|
}
|
||||||
|
|
||||||
class LogWebServiceTest extends LogWebService {
|
class LogWebServiceTest extends LogWebService {
|
||||||
|
|
||||||
@Override protected TimelineEntity getEntity(String path,
|
@Override
|
||||||
|
protected TimelineEntity getEntity(String path,
|
||||||
MultivaluedMap<String, String> params) throws IOException {
|
MultivaluedMap<String, String> params) throws IOException {
|
||||||
if (path.endsWith(cId.toString())) {
|
if (path.endsWith(cId.toString())) {
|
||||||
return entities.get(cId.toString());
|
return entities.get(cId.toString());
|
||||||
|
|
Loading…
Reference in New Issue