YARN-10101. Support listing of aggregated logs for containers belonging to an application attempt. Contributed by Adam Antal

This commit is contained in:
Szilard Nemeth 2020-02-06 12:16:12 +01:00
parent 314e2f9d2e
commit 71b2c2ffe9
18 changed files with 1295 additions and 141 deletions

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.webapp.WrappedLogMetaRequest;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.LogServlet; import org.apache.hadoop.yarn.server.webapp.LogServlet;
import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.WebServices;
@ -83,7 +84,7 @@
public class HsWebServices extends WebServices { public class HsWebServices extends WebServices {
private final HistoryContext ctx; private final HistoryContext ctx;
private WebApp webapp; private WebApp webapp;
private final LogServlet logServlet; private LogServlet logServlet;
private @Context HttpServletResponse response; private @Context HttpServletResponse response;
@Context UriInfo uriInfo; @Context UriInfo uriInfo;
@ -422,18 +423,39 @@ public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters(
return new JobTaskAttemptCounterInfo(ta); return new JobTaskAttemptCounterInfo(ta);
} }
@GET
@Path("/aggregatedlogs")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@InterfaceAudience.Public
@InterfaceStability.Unstable
public Response getAggregatedLogsMeta(@Context HttpServletRequest hsr,
@QueryParam(YarnWebServiceParams.APP_ID) String appIdStr,
@QueryParam(YarnWebServiceParams.APPATTEMPT_ID) String appAttemptIdStr,
@QueryParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
@QueryParam(YarnWebServiceParams.NM_ID) String nmId,
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
@DefaultValue("false") boolean redirectedFromNode) {
init();
return logServlet.getLogsInfo(hsr, appIdStr, appAttemptIdStr,
containerIdStr, nmId, redirectedFromNode);
}
@GET @GET
@Path("/containers/{containerid}/logs") @Path("/containers/{containerid}/logs")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public Response getLogs(@Context HttpServletRequest hsr, public Response getContainerLogs(@Context HttpServletRequest hsr,
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
@QueryParam(YarnWebServiceParams.NM_ID) String nmId, @QueryParam(YarnWebServiceParams.NM_ID) String nmId,
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
@DefaultValue("false") boolean redirectedFromNode) { @DefaultValue("false") boolean redirectedFromNode) {
init(); init();
return logServlet.getContainerLogsInfo(hsr, containerIdStr, nmId,
WrappedLogMetaRequest.Builder logMetaRequestBuilder =
LogServlet.createRequestFromContainerId(containerIdStr);
return logServlet.getContainerLogsInfo(hsr, logMetaRequestBuilder, nmId,
redirectedFromNode, null); redirectedFromNode, null);
} }
@ -442,7 +464,7 @@ public Response getLogs(@Context HttpServletRequest hsr,
@Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 }) @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Unstable
public Response getLogs(@Context HttpServletRequest req, public Response getContainerLogFile(@Context HttpServletRequest req,
@PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
@PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) @PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME)
String filename, String filename,
@ -457,4 +479,14 @@ public Response getLogs(@Context HttpServletRequest req,
return logServlet.getLogFile(req, containerIdStr, filename, format, size, return logServlet.getLogFile(req, containerIdStr, filename, format, size,
nmId, redirectedFromNode, null); nmId, redirectedFromNode, null);
} }
@VisibleForTesting
LogServlet getLogServlet() {
return this.logServlet;
}
@VisibleForTesting
void setLogServlet(LogServlet logServlet) {
this.logServlet = logServlet;
}
} }

View File

@ -0,0 +1,766 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryContext;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
import org.apache.hadoop.yarn.server.webapp.LogServlet;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* We created the following aggregated log structure, and test the log
* related API endpoints of {@link HsWebServices}.
*
* application_1 is finished
* attempt_1
* container_1 finished on node_1 syslog
* container_2 finished on node_1 syslog
* container_3 finished on node_2 syslog
* attempt_2
* container_1 finished on node_1 syslog
*
* application_2 is running
* attempt_1
* container_1 finished on node_1 syslog
* attempt_2
* container_1 finished on node_1 syslog
* container_2 running on node_1 syslog
* container_3 running on node_2 syslog (with some already aggregated log)
*
*/
public class TestHsWebServicesLogs extends JerseyTestBase {
private static Configuration conf = new YarnConfiguration();
private static FileSystem fs;
private static final String LOCAL_ROOT_LOG_DIR = "target/LocalLogs";
private static final String REMOTE_LOG_ROOT_DIR = "target/logs/";
private static final String USER = "fakeUser";
private static final String FILE_NAME = "syslog";
private static final String NM_WEBADDRESS_1 = "test-nm-web-address-1:9999";
private static final NodeId NM_ID_1 = NodeId.newInstance("fakeHost1", 9951);
private static final String NM_WEBADDRESS_2 = "test-nm-web-address-2:9999";
private static final NodeId NM_ID_2 = NodeId.newInstance("fakeHost2", 9952);
private static final ApplicationId APPID_1 = ApplicationId.newInstance(1, 1);
private static final ApplicationId APPID_2 = ApplicationId.newInstance(10, 2);
private static final ApplicationAttemptId APP_ATTEMPT_1_1 =
ApplicationAttemptId.newInstance(APPID_1, 1);
private static final ApplicationAttemptId APP_ATTEMPT_1_2 =
ApplicationAttemptId.newInstance(APPID_1, 2);
private static final ApplicationAttemptId APP_ATTEMPT_2_1 =
ApplicationAttemptId.newInstance(APPID_2, 1);
private static final ApplicationAttemptId APP_ATTEMPT_2_2 =
ApplicationAttemptId.newInstance(APPID_2, 2);
private static final ContainerId CONTAINER_1_1_1 =
ContainerId.newContainerId(APP_ATTEMPT_1_1, 1);
private static final ContainerId CONTAINER_1_1_2 =
ContainerId.newContainerId(APP_ATTEMPT_1_1, 2);
private static final ContainerId CONTAINER_1_1_3 =
ContainerId.newContainerId(APP_ATTEMPT_1_1, 3);
private static final ContainerId CONTAINER_1_2_1 =
ContainerId.newContainerId(APP_ATTEMPT_1_2, 1);
private static final ContainerId CONTAINER_2_1_1 =
ContainerId.newContainerId(APP_ATTEMPT_2_1, 1);
private static final ContainerId CONTAINER_2_2_1 =
ContainerId.newContainerId(APP_ATTEMPT_2_2, 1);
private static final ContainerId CONTAINER_2_2_2 =
ContainerId.newContainerId(APP_ATTEMPT_2_2, 2);
private static final ContainerId CONTAINER_2_2_3 =
ContainerId.newContainerId(APP_ATTEMPT_2_2, 3);
static {
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT_DIR);
}
private static class WebServletModule extends ServletModule {
@Override
protected void configureServlets() {
MockHistoryContext appContext = new MockHistoryContext(0, 1, 2, 1);
HsWebApp webApp = mock(HsWebApp.class);
when(webApp.name()).thenReturn("hsmockwebapp");
ApplicationClientProtocol mockProtocol =
mock(ApplicationClientProtocol.class);
try {
doAnswer(invocationOnMock -> {
GetApplicationReportRequest request =
invocationOnMock.getArgument(0);
// returning the latest application attempt for each application
if (request.getApplicationId().equals(APPID_1)) {
return GetApplicationReportResponse.newInstance(
newApplicationReport(APPID_1, APP_ATTEMPT_1_2, false));
} else if (request.getApplicationId().equals(APPID_2)) {
return GetApplicationReportResponse.newInstance(
newApplicationReport(APPID_2, APP_ATTEMPT_2_2, true));
}
throw new RuntimeException(
"Unknown applicationId: " + request.getApplicationId());
}).when(mockProtocol).getApplicationReport(any());
doAnswer(invocationOnMock -> {
GetContainerReportRequest request = invocationOnMock.getArgument(0);
ContainerId cId = request.getContainerId();
// for running containers assign node id and NM web address
if (cId.equals(CONTAINER_2_2_2)) {
return GetContainerReportResponse.newInstance(
newContainerReport(cId, NM_ID_1, NM_WEBADDRESS_1));
} else if (cId.equals(CONTAINER_2_2_3)) {
return GetContainerReportResponse.newInstance(
newContainerReport(cId, NM_ID_2, NM_WEBADDRESS_2));
}
// for finished application don't assign node id and NM web address
return GetContainerReportResponse.newInstance(
newContainerReport(cId, null, null));
}).when(mockProtocol).getContainerReport(any());
} catch (Exception ignore) {
fail("Failed to setup WebServletModule class");
}
HsWebServices hsWebServices =
new HsWebServices(appContext, conf, webApp, mockProtocol);
try {
LogServlet logServlet = hsWebServices.getLogServlet();
logServlet = spy(logServlet);
doReturn(null).when(logServlet).getNMWebAddressFromRM(any());
doReturn(NM_WEBADDRESS_1).when(logServlet).getNMWebAddressFromRM(
NM_ID_1.toString());
doReturn(NM_WEBADDRESS_2).when(logServlet).getNMWebAddressFromRM(
NM_ID_2.toString());
hsWebServices.setLogServlet(logServlet);
} catch (Exception ignore) {
fail("Failed to setup WebServletModule class");
}
bind(JAXBContextResolver.class);
bind(HsWebServices.class).toInstance(hsWebServices);
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
bind(ApplicationClientProtocol.class).toInstance(mockProtocol);
serve("/*").with(GuiceContainer.class);
}
}
@BeforeClass
public static void setupClass() throws Exception {
fs = FileSystem.get(conf);
createAggregatedFolders();
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
}
@Before
public void setUp() {
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule()));
}
/**
* Generating aggregated container logs for all containers
* except CONTAINER_2_2_2, which is still running.
*
* @throws Exception if failed to create aggregated log files
*/
private static void createAggregatedFolders() throws Exception {
Map<ContainerId, String> contentsApp1 = new HashMap<>();
contentsApp1.put(CONTAINER_1_1_1, "Hello-" + CONTAINER_1_1_1);
contentsApp1.put(CONTAINER_1_1_2, "Hello-" + CONTAINER_1_1_2);
contentsApp1.put(CONTAINER_1_2_1, "Hello-" + CONTAINER_1_2_1);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
LOCAL_ROOT_LOG_DIR, APPID_1, contentsApp1, NM_ID_1, FILE_NAME,
USER, false);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
LOCAL_ROOT_LOG_DIR, APPID_1, Collections.singletonMap(CONTAINER_1_1_3,
"Hello-" + CONTAINER_1_1_3), NM_ID_2, FILE_NAME, USER, false);
Map<ContainerId, String> contentsApp2 = new HashMap<>();
contentsApp2.put(CONTAINER_2_1_1, "Hello-" + CONTAINER_2_1_1);
contentsApp2.put(CONTAINER_2_2_1, "Hello-" + CONTAINER_2_2_1);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
LOCAL_ROOT_LOG_DIR, APPID_2, contentsApp2, NM_ID_1, FILE_NAME,
USER, false);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
LOCAL_ROOT_LOG_DIR, APPID_2, Collections.singletonMap(CONTAINER_2_2_3,
"Hello-" + CONTAINER_2_2_3), NM_ID_2, FILE_NAME, USER, false);
}
public TestHsWebServicesLogs() {
super(new WebAppDescriptor.Builder(
"org.apache.hadoop.mapreduce.v2.hs.webapp")
.contextListenerClass(GuiceServletConfig.class)
.filterClass(com.google.inject.servlet.GuiceFilter.class)
.contextPath("jersey-guice-filter").servletPath("/").build());
}
@AfterClass
public static void tearDownClass() throws Exception {
fs.delete(new Path(REMOTE_LOG_ROOT_DIR), true);
fs.delete(new Path(LOCAL_ROOT_LOG_DIR), true);
}
@Test
public void testGetAggregatedLogsMetaForFinishedApp() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(YarnWebServiceParams.APP_ID, APPID_1.toString())
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseList =
response.getEntity(new GenericType<List<ContainerLogsInfo>>(){});
Set<String> expectedIdStrings = Sets.newHashSet(
CONTAINER_1_1_1.toString(), CONTAINER_1_1_2.toString(),
CONTAINER_1_1_3.toString(), CONTAINER_1_2_1.toString());
assertResponseList(responseList, expectedIdStrings, false);
for (ContainerLogsInfo logsInfo : responseList) {
String cId = logsInfo.getContainerId();
assertThat(logsInfo.getLogType()).isEqualTo(
ContainerLogAggregationType.AGGREGATED.toString());
if (cId.equals(CONTAINER_1_1_3.toString())) {
assertThat(logsInfo.getNodeId()).isEqualTo(formatNodeId(NM_ID_2));
} else {
assertThat(logsInfo.getNodeId()).isEqualTo(formatNodeId(NM_ID_1));
}
assertSimpleContainerLogFileInfo(logsInfo, cId);
}
}
@Test
public void testGetAggregatedLogsMetaForRunningApp() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(YarnWebServiceParams.APP_ID, APPID_2.toString())
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseList =
response.getEntity(new GenericType<List<ContainerLogsInfo>>(){});
Set<String> expectedIdStrings = Sets.newHashSet(
CONTAINER_2_1_1.toString(), CONTAINER_2_2_1.toString(),
CONTAINER_2_2_3.toString());
assertResponseList(responseList, expectedIdStrings, true);
for (ContainerLogsInfo logsInfo : responseList) {
String cId = logsInfo.getContainerId();
if (cId.equals(CONTAINER_2_2_3.toString())) {
assertThat(logsInfo.getNodeId()).isEqualTo(formatNodeId(NM_ID_2));
} else {
assertThat(logsInfo.getNodeId()).isEqualTo(formatNodeId(NM_ID_1));
}
if (logsInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
assertSimpleContainerLogFileInfo(logsInfo, cId);
}
}
}
@Test
public void testGetAggregatedLogsMetaForFinishedAppAttempt() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(
YarnWebServiceParams.APPATTEMPT_ID, APP_ATTEMPT_1_1.toString())
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseList =
response.getEntity(new GenericType<List<ContainerLogsInfo>>(){});
Set<String> expectedIdStrings = Sets.newHashSet(
CONTAINER_1_1_1.toString(), CONTAINER_1_1_2.toString(),
CONTAINER_1_1_3.toString());
assertResponseList(responseList, expectedIdStrings, false);
for (ContainerLogsInfo logsInfo : responseList) {
String cId = logsInfo.getContainerId();
assertThat(logsInfo.getLogType()).isEqualTo(
ContainerLogAggregationType.AGGREGATED.toString());
if (cId.equals(CONTAINER_1_1_3.toString())) {
assertThat(logsInfo.getNodeId()).isEqualTo(formatNodeId(NM_ID_2));
} else {
assertThat(logsInfo.getNodeId()).isEqualTo(formatNodeId(NM_ID_1));
}
assertSimpleContainerLogFileInfo(logsInfo, cId);
}
}
@Test
public void testGetAggregatedLogsMetaForRunningAppAttempt() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(
YarnWebServiceParams.APPATTEMPT_ID, APP_ATTEMPT_2_2.toString())
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseList =
response.getEntity(new GenericType<List<ContainerLogsInfo>>(){});
Set<String> expectedIdStrings = Sets.newHashSet(
CONTAINER_2_2_1.toString(), CONTAINER_2_2_3.toString());
assertResponseList(responseList, expectedIdStrings, true);
for (ContainerLogsInfo logsInfo : responseList) {
String cId = logsInfo.getContainerId();
if (cId.equals(CONTAINER_2_2_3.toString())) {
assertThat(logsInfo.getNodeId()).isEqualTo(formatNodeId(NM_ID_2));
} else {
assertThat(logsInfo.getNodeId()).isEqualTo(formatNodeId(NM_ID_1));
}
if (logsInfo.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
assertSimpleContainerLogFileInfo(logsInfo, cId);
}
}
}
@Test
public void testGetContainerLogsForFinishedContainer() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("containers")
.path(CONTAINER_1_1_2.toString()).path("logs")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseText =
response.getEntity(new GenericType<List<ContainerLogsInfo>>(){});
assertThat(responseText.size()).isOne();
ContainerLogsInfo logsInfo = responseText.get(0);
assertThat(logsInfo.getLogType()).isEqualTo(
ContainerLogAggregationType.AGGREGATED.toString());
assertThat(logsInfo.getContainerId()).isEqualTo(CONTAINER_1_1_2.toString());
assertSimpleContainerLogFileInfo(logsInfo, CONTAINER_1_1_2.toString());
}
@Test
public void testGetContainerLogsForRunningContainer() throws Exception {
WebResource r = resource();
URI requestURI = r.path("ws").path("v1")
.path("history").path("containers")
.path(CONTAINER_2_2_2.toString())
.path("logs")
.getURI();
String redirectURL = getRedirectURL(requestURI.toString());
assertThat(redirectURL).isNotNull();
assertThat(redirectURL).contains(NM_WEBADDRESS_1,
"ws/v1/node/containers", CONTAINER_2_2_2.toString(), "/logs");
// If we specify NM id, we would re-direct the request
// to this NM's Web Address.
requestURI = r.path("ws").path("v1")
.path("history").path("containers")
.path(CONTAINER_2_2_2.toString())
.path("logs")
.queryParam(YarnWebServiceParams.NM_ID, NM_ID_2.toString())
.getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertThat(redirectURL).isNotNull();
assertThat(redirectURL).contains(NM_WEBADDRESS_2,
"ws/v1/node/containers", CONTAINER_2_2_2.toString(), "/logs");
// If this is the redirect request, we would not re-direct the request
// back and get the aggregated log meta.
ClientResponse response = r.path("ws").path("v1")
.path("history").path("containers")
.path(CONTAINER_2_2_3.toString())
.path("logs")
.queryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE, "true")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
List<ContainerLogsInfo> responseText =
response.getEntity(new GenericType<List<ContainerLogsInfo>>(){});
assertThat(responseText.size()).isEqualTo(2);
ContainerLogsInfo logsInfo1 = responseText.get(0);
ContainerLogsInfo logsInfo2 = responseText.get(1);
assertThat(logsInfo1.getContainerId())
.isEqualTo(CONTAINER_2_2_3.toString());
assertThat(logsInfo2.getContainerId())
.isEqualTo(CONTAINER_2_2_3.toString());
if (logsInfo1.getLogType().equals(
ContainerLogAggregationType.AGGREGATED.toString())) {
assertThat(logsInfo2.getLogType()).isEqualTo(
ContainerLogAggregationType.LOCAL.toString());
assertSimpleContainerLogFileInfo(logsInfo1, CONTAINER_2_2_3.toString());
// this information can be only obtained by the NM.
assertThat(logsInfo2.getContainerLogsInfo()).isNull();
} else {
assertThat(logsInfo1.getLogType()).isEqualTo(
ContainerLogAggregationType.LOCAL.toString());
assertThat(logsInfo2.getLogType()).isEqualTo(
ContainerLogAggregationType.AGGREGATED.toString());
// this information can be only obtained by the NM.
assertThat(logsInfo1.getContainerLogsInfo()).isNull();
assertSimpleContainerLogFileInfo(logsInfo2, CONTAINER_2_2_3.toString());
}
}
@Test
public void testGetContainerLogFileForFinishedContainer() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("containerlogs")
.path(CONTAINER_1_1_2.toString())
.path(FILE_NAME)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).doesNotContain("Can not find logs",
"Hello-" + CONTAINER_1_1_1);
assertThat(responseText).contains("Hello-" + CONTAINER_1_1_2);
}
@Test
public void testNoRedirectForFinishedContainer() throws Exception {
WebResource r = resource();
URI requestURI = r.path("ws").path("v1")
.path("history").path("containerlogs")
.path(CONTAINER_2_2_1.toString())
.path(FILE_NAME).getURI();
String redirectURL = getRedirectURL(requestURI.toString());
assertThat(redirectURL).isNull();
}
/**
* For local logs we can only check the redirect to the appropriate node.
*/
@Test
public void testGetContainerLogFileForRunningContainer() throws Exception {
WebResource r = resource();
URI requestURI = r.path("ws").path("v1")
.path("history").path("containerlogs")
.path(CONTAINER_2_2_2.toString())
.path(FILE_NAME).getURI();
String redirectURL = getRedirectURL(requestURI.toString());
assertThat(redirectURL).isNotNull();
assertThat(redirectURL).contains(NM_WEBADDRESS_1, "ws/v1/node/containers",
"/logs/" + FILE_NAME, CONTAINER_2_2_2.toString());
// If we specify NM id, we would re-direct the request
// to this NM's Web Address.
requestURI = r.path("ws").path("v1")
.path("history").path("containerlogs")
.path(CONTAINER_2_2_2.toString()).path(FILE_NAME)
.queryParam(YarnWebServiceParams.NM_ID, NM_ID_2.toString())
.getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertThat(redirectURL).isNotNull();
assertThat(redirectURL).contains(NM_WEBADDRESS_2, "ws/v1/node/containers",
"/logs/" + FILE_NAME, CONTAINER_2_2_2.toString());
// If this is the redirect request, we would not re-direct the request
// back and get the aggregated logs.
ClientResponse response = r.path("ws").path("v1")
.path("history").path("containerlogs")
.path(CONTAINER_2_2_3.toString()).path(FILE_NAME)
.queryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE, "true")
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).isNotNull();
assertThat(responseText).contains("LogAggregationType: "
+ ContainerLogAggregationType.AGGREGATED, "Hello-" + CONTAINER_2_2_3);
}
@Test
public void testNonExistingAppId() {
ApplicationId nonExistingApp = ApplicationId.newInstance(99, 99);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(YarnWebServiceParams.APP_ID, nonExistingApp.toString())
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).contains(
WebApplicationException.class.getSimpleName());
assertThat(responseText).contains("Can not find");
}
@Test
public void testBadAppId() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(YarnWebServiceParams.APP_ID, "some text")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).contains(
BadRequestException.class.getSimpleName());
assertThat(responseText).contains("Invalid ApplicationId");
}
@Test
public void testNonExistingAppAttemptId() {
ApplicationId nonExistingApp = ApplicationId.newInstance(99, 99);
ApplicationAttemptId nonExistingAppAttemptId =
ApplicationAttemptId.newInstance(nonExistingApp, 1);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(YarnWebServiceParams.APPATTEMPT_ID,
nonExistingAppAttemptId.toString())
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).contains(
WebApplicationException.class.getSimpleName());
assertThat(responseText).contains("Can not find");
}
@Test
public void testBadAppAttemptId() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(YarnWebServiceParams.APPATTEMPT_ID, "some text")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).contains(
BadRequestException.class.getSimpleName());
assertThat(responseText).contains("Invalid AppAttemptId");
}
@Test
public void testNonExistingContainerId() {
ApplicationId nonExistingApp = ApplicationId.newInstance(99, 99);
ApplicationAttemptId nonExistingAppAttemptId =
ApplicationAttemptId.newInstance(nonExistingApp, 1);
ContainerId nonExistingContainerId =
ContainerId.newContainerId(nonExistingAppAttemptId, 1);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(YarnWebServiceParams.CONTAINER_ID,
nonExistingContainerId.toString())
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).contains(
WebApplicationException.class.getSimpleName());
assertThat(responseText).contains("Can not find");
}
@Test
public void testBadContainerId() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("aggregatedlogs")
.queryParam(YarnWebServiceParams.CONTAINER_ID, "some text")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).contains(
BadRequestException.class.getSimpleName());
assertThat(responseText).contains("Invalid ContainerId");
}
@Test
public void testNonExistingContainerMeta() {
ApplicationId nonExistingApp = ApplicationId.newInstance(99, 99);
ApplicationAttemptId nonExistingAppAttemptId =
ApplicationAttemptId.newInstance(nonExistingApp, 1);
ContainerId nonExistingContainerId =
ContainerId.newContainerId(nonExistingAppAttemptId, 1);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("containers")
.path(nonExistingContainerId.toString()).path("logs")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).contains(
WebApplicationException.class.getSimpleName());
assertThat(responseText).contains("Can not find");
}
@Test
public void testBadContainerForMeta() {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("history").path("containers")
.path("some text").path("logs")
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
String responseText = response.getEntity(String.class);
assertThat(responseText).contains(
BadRequestException.class.getSimpleName());
assertThat(responseText).contains("Invalid container id");
}
private static void assertSimpleContainerLogFileInfo(
ContainerLogsInfo logsInfo, String cId) {
assertThat(logsInfo.getContainerLogsInfo()).isNotNull();
assertThat(logsInfo.getContainerLogsInfo().size()).isEqualTo(1);
ContainerLogFileInfo fileInfo = logsInfo.getContainerLogsInfo().get(0);
assertThat(fileInfo.getFileName()).isEqualTo(FILE_NAME);
assertThat(fileInfo.getFileSize()).isEqualTo(
String.valueOf(("Hello-" + cId).length()));
}
private static void assertResponseList(List<ContainerLogsInfo> responseList,
Set<String> expectedIdStrings, boolean running) {
Set<String> actualStrings =
responseList.stream()
.map(ContainerLogsInfo::getContainerId)
.collect(Collectors.toSet());
assertThat(actualStrings).isEqualTo(expectedIdStrings);
int expectedSize = expectedIdStrings.size();
assertThat(responseList.size()).isEqualTo(
running ? expectedSize * 2 : expectedSize);
}
private static String formatNodeId(NodeId nodeId) {
return nodeId.toString().replace(":", "_");
}
private static ApplicationReport newApplicationReport(ApplicationId appId,
ApplicationAttemptId appAttemptId, boolean running) {
return ApplicationReport.newInstance(appId, appAttemptId, USER,
"fakeQueue", "fakeApplicationName", "localhost", 0, null,
running ? YarnApplicationState.RUNNING : YarnApplicationState.FINISHED,
"fake an application report", "", 1000L, 1000L, 1000L, null, null,
"", 50f, "fakeApplicationType", null);
}
private static ContainerReport newContainerReport(ContainerId containerId,
NodeId nodeId, String nmWebAddress) {
return ContainerReport.newInstance(containerId, null, nodeId,
Priority.UNDEFINED, 0, 0, null, null, 0, null, nmWebAddress);
}
private static String getRedirectURL(String url) throws Exception {
HttpURLConnection conn = (HttpURLConnection) new URL(url)
.openConnection();
// do not automatically follow the redirection
// otherwise we get too many redirection exceptions
conn.setInstanceFollowRedirects(false);
if (conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) {
return conn.getHeaderField("Location");
}
return null;
}
}

View File

@ -356,7 +356,7 @@ private int runCommand(String[] args) throws Exception {
} }
ContainerLogsRequest request = new ContainerLogsRequest(appId, ContainerLogsRequest request = new ContainerLogsRequest(appId, null,
Apps.isApplicationFinalState(appState), appOwner, nodeAddress, Apps.isApplicationFinalState(appState), appOwner, nodeAddress,
null, containerIdStr, localDir, logs, bytes, null); null, containerIdStr, localDir, logs, bytes, null);

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.yarn.logaggregation; package org.apache.hadoop.yarn.logaggregation;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
public class ContainerLogsRequest { public class ContainerLogsRequest {
private ApplicationId appId; private ApplicationId appId;
private ApplicationAttemptId appAttemptId;
private String containerId; private String containerId;
private String nodeId; private String nodeId;
private String nodeHttpAddress; private String nodeHttpAddress;
@ -38,6 +41,7 @@ public ContainerLogsRequest() {}
public ContainerLogsRequest(ContainerLogsRequest request) { public ContainerLogsRequest(ContainerLogsRequest request) {
this.setAppId(request.getAppId()); this.setAppId(request.getAppId());
this.setAppAttemptId(request.getAppAttemptId());
this.setAppFinished(request.isAppFinished()); this.setAppFinished(request.isAppFinished());
this.setAppOwner(request.getAppOwner()); this.setAppOwner(request.getAppOwner());
this.setNodeId(request.getNodeId()); this.setNodeId(request.getNodeId());
@ -50,10 +54,11 @@ public ContainerLogsRequest(ContainerLogsRequest request) {
} }
public ContainerLogsRequest(ApplicationId applicationId, public ContainerLogsRequest(ApplicationId applicationId,
boolean isAppFinished, String owner, ApplicationAttemptId appAttemptId, boolean isAppFinished, String owner,
String address, String httpAddress, String container, String localDir, String address, String httpAddress, String container, String localDir,
Set<String> logs, long bytes, ContainerState containerState) { Set<String> logs, long bytes, ContainerState containerState) {
this.setAppId(applicationId); this.setAppId(applicationId);
this.setAppAttemptId(appAttemptId);
this.setAppFinished(isAppFinished); this.setAppFinished(isAppFinished);
this.setAppOwner(owner); this.setAppOwner(owner);
this.setNodeId(address); this.setNodeId(address);
@ -73,6 +78,14 @@ public void setAppId(ApplicationId appId) {
this.appId = appId; this.appId = appId;
} }
public ApplicationAttemptId getAppAttemptId() {
return this.appAttemptId;
}
public void setAppAttemptId(ApplicationAttemptId appAttemptId) {
this.appAttemptId = appAttemptId;
}
public String getContainerId() { public String getContainerId() {
return containerId; return containerId;
} }

View File

@ -49,7 +49,9 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -594,4 +596,18 @@ protected String aggregatedLogSuffix(String fileName) {
public boolean isFsSupportsChmod() { public boolean isFsSupportsChmod() {
return fsSupportsChmod; return fsSupportsChmod;
} }
protected boolean belongsToAppAttempt(ApplicationAttemptId appAttemptId,
String containerIdStr) {
ContainerId containerId = null;
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException exc) {
LOG.warn("Could not parse container id from aggregated log.", exc);
}
if (containerId != null && containerId.getApplicationAttemptId() != null) {
return containerId.getApplicationAttemptId().equals(appAttemptId);
}
return false;
}
} }

View File

@ -71,6 +71,7 @@
import org.apache.hadoop.io.file.tfile.Compression.Algorithm; import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
@ -619,8 +620,9 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
String nodeId = logRequest.getNodeId(); String nodeId = logRequest.getNodeId();
ApplicationId appId = logRequest.getAppId(); ApplicationId appId = logRequest.getAppId();
String appOwner = logRequest.getAppOwner(); String appOwner = logRequest.getAppOwner();
boolean getAllContainers = (containerIdStr == null || ApplicationAttemptId appAttemptId = logRequest.getAppAttemptId();
containerIdStr.isEmpty()); boolean getAllContainers = ((containerIdStr == null ||
containerIdStr.isEmpty()) && appAttemptId != null);
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
: LogAggregationUtils.getNodeString(nodeId); : LogAggregationUtils.getNodeString(nodeId);
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
@ -664,8 +666,12 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
if (getAllContainers) { if (getAllContainers) {
for (Entry<String, List<IndexedFileLogMeta>> log : logMeta for (Entry<String, List<IndexedFileLogMeta>> log : logMeta
.getLogMetas().entrySet()) { .getLogMetas().entrySet()) {
String currentContainerIdStr = log.getKey();
if (!belongsToAppAttempt(appAttemptId, currentContainerIdStr)) {
continue;
}
ContainerLogMeta meta = new ContainerLogMeta( ContainerLogMeta meta = new ContainerLogMeta(
log.getKey().toString(), curNodeId); log.getKey(), curNodeId);
for (IndexedFileLogMeta aMeta : log.getValue()) { for (IndexedFileLogMeta aMeta : log.getValue()) {
meta.addLogMeta(aMeta.getFileName(), Long.toString( meta.addLogMeta(aMeta.getFileName(), Long.toString(
aMeta.getFileSize()), aMeta.getFileSize()),

View File

@ -27,6 +27,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.commons.math3.util.Pair; import org.apache.commons.math3.util.Pair;
@ -264,7 +266,10 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
String nodeId = logRequest.getNodeId(); String nodeId = logRequest.getNodeId();
ApplicationId appId = logRequest.getAppId(); ApplicationId appId = logRequest.getAppId();
String appOwner = logRequest.getAppOwner(); String appOwner = logRequest.getAppOwner();
boolean getAllContainers = (containerIdStr == null); ApplicationAttemptId appAttemptId = logRequest.getAppAttemptId();
boolean getAllContainers = (containerIdStr == null &&
appAttemptId == null);
boolean getOnlyOneContainer = containerIdStr != null;
String nodeIdStr = (nodeId == null) ? null String nodeIdStr = (nodeId == null) ? null
: LogAggregationUtils.getNodeString(nodeId); : LogAggregationUtils.getNodeString(nodeId);
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
@ -297,7 +302,8 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
LogKey key = new LogKey(); LogKey key = new LogKey();
valueStream = reader.next(key); valueStream = reader.next(key);
while (valueStream != null) { while (valueStream != null) {
if (getAllContainers || (key.toString().equals(containerIdStr))) { if (getAllContainers || (key.toString().equals(containerIdStr)) ||
belongsToAppAttempt(appAttemptId, key.toString())) {
ContainerLogMeta containerLogMeta = new ContainerLogMeta( ContainerLogMeta containerLogMeta = new ContainerLogMeta(
key.toString(), thisNodeFile.getPath().getName()); key.toString(), thisNodeFile.getPath().getName());
while (true) { while (true) {
@ -314,7 +320,7 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
} }
} }
containersLogMeta.add(containerLogMeta); containersLogMeta.add(containerLogMeta);
if (!getAllContainers) { if (getOnlyOneContainer) {
break; break;
} }
} }

View File

@ -52,23 +52,21 @@ private TestContainerLogsUtils() {}
* @param conf the configuration * @param conf the configuration
* @param fs the FileSystem * @param fs the FileSystem
* @param rootLogDir the root log directory * @param rootLogDir the root log directory
* @param containerId the containerId * @param appId the application id
* @param containerToContent mapping between container id and its content
* @param nodeId the nodeId * @param nodeId the nodeId
* @param fileName the log file name * @param fileName the log file name
* @param user the application user * @param user the application user
* @param content the log context * @param deleteRemoteLogDir whether to delete remote log dir.
* @param deletePreviousRemoteLogDir whether to delete remote log dir.
* @throws IOException if we can not create log files locally * @throws IOException if we can not create log files locally
* or we can not upload container logs into RemoteFS. * or we can not upload container logs into RemoteFS.
*/ */
public static void createContainerLogFileInRemoteFS(Configuration conf, public static void createContainerLogFileInRemoteFS(Configuration conf,
FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId, FileSystem fs, String rootLogDir, ApplicationId appId,
String fileName, String user, String content, Map<ContainerId, String> containerToContent, NodeId nodeId,
boolean deleteRemoteLogDir) throws Exception { String fileName, String user, boolean deleteRemoteLogDir)
throws Exception {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
//prepare the logs for remote directory
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
// create local logs // create local logs
List<String> rootLogDirList = new ArrayList<String>(); List<String> rootLogDirList = new ArrayList<String>();
rootLogDirList.add(rootLogDir); rootLogDirList.add(rootLogDir);
@ -83,8 +81,7 @@ public static void createContainerLogFileInRemoteFS(Configuration conf,
} }
assertTrue(fs.mkdirs(appLogsDir)); assertTrue(fs.mkdirs(appLogsDir));
createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName, createContainerLogInLocalDir(appLogsDir, containerToContent, fs, fileName);
content);
// upload container logs to remote log dir // upload container logs to remote log dir
LogAggregationFileControllerFactory factory = LogAggregationFileControllerFactory factory =
@ -98,27 +95,33 @@ public static void createContainerLogFileInRemoteFS(Configuration conf,
fs.delete(path, true); fs.delete(path, true);
} }
assertTrue(fs.mkdirs(path)); assertTrue(fs.mkdirs(path));
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId, uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId, appId,
containerId, path, fs); containerToContent.keySet(), path);
} }
private static void createContainerLogInLocalDir(Path appLogsDir, private static void createContainerLogInLocalDir(Path appLogsDir,
ContainerId containerId, FileSystem fs, String fileName, String content) Map<ContainerId, String> containerToContent, FileSystem fs,
throws IOException{ String fileName) throws IOException {
Path containerLogsDir = new Path(appLogsDir, containerId.toString()); for (Map.Entry<ContainerId, String> containerAndContent :
if (fs.exists(containerLogsDir)) { containerToContent.entrySet()) {
fs.delete(containerLogsDir, true); ContainerId containerId = containerAndContent.getKey();
String content = containerAndContent.getValue();
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
if (fs.exists(containerLogsDir)) {
fs.delete(containerLogsDir, true);
}
assertTrue(fs.mkdirs(containerLogsDir));
Writer writer =
new FileWriter(new File(containerLogsDir.toString(), fileName));
writer.write(content);
writer.close();
} }
assertTrue(fs.mkdirs(containerLogsDir));
Writer writer =
new FileWriter(new File(containerLogsDir.toString(), fileName));
writer.write(content);
writer.close();
} }
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId, Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws Exception { ApplicationId appId, Iterable<ContainerId> containerIds, Path appDir)
throws Exception {
Path path = Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)); new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
LogAggregationFileControllerFactory factory LogAggregationFileControllerFactory factory
@ -128,16 +131,16 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
try { try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>(); Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
LogAggregationFileControllerContext context LogAggregationFileControllerContext context
= new LogAggregationFileControllerContext( = new LogAggregationFileControllerContext(
path, path, true, 1000, path, path, true, 1000,
appId, appAcls, nodeId, ugi); appId, appAcls, nodeId, ugi);
fileController.initializeWriter(context); fileController.initializeWriter(context);
fileController.write(new AggregatedLogFormat.LogKey(containerId), for (ContainerId containerId : containerIds) {
new AggregatedLogFormat.LogValue(rootLogDirs, containerId, fileController.write(new AggregatedLogFormat.LogKey(containerId),
ugi.getShortUserName())); new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
}
} finally { } finally {
fileController.closeWriter(); fileController.closeWriter();
} }

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.server.webapp.LogServlet; import org.apache.hadoop.yarn.server.webapp.LogServlet;
import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.WrappedLogMetaRequest;
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
@ -225,7 +226,11 @@ public Response getContainerLogsInfo(
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
@DefaultValue("false") boolean redirected_from_node) { @DefaultValue("false") boolean redirected_from_node) {
initForReadableEndpoints(res); initForReadableEndpoints(res);
return logServlet.getContainerLogsInfo(req, containerIdStr, nmId,
WrappedLogMetaRequest.Builder logMetaRequestBuilder =
LogServlet.createRequestFromContainerId(containerIdStr);
return logServlet.getContainerLogsInfo(req, logMetaRequestBuilder, nmId,
redirected_from_node, null); redirected_from_node, null);
} }

View File

@ -33,6 +33,7 @@
import java.net.URL; import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
@ -554,11 +555,13 @@ public void testContainerLogsForFinishedApps() throws Exception {
ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100); ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1, nodeId, fileName, user, rootLogDir, appId, Collections.singletonMap(containerId1,
("Hello." + containerId1), true); "Hello." + containerId1),
nodeId, fileName, user, true);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId100, nodeId2, fileName, user, rootLogDir, appId, Collections.singletonMap(containerId100,
("Hello." + containerId100), false); "Hello." + containerId100),
nodeId2, fileName, user, false);
// test whether we can find container log from remote diretory if // test whether we can find container log from remote diretory if
// the containerInfo for this container could be fetched from AHS. // the containerInfo for this container could be fetched from AHS.
WebResource r = resource(); WebResource r = resource();
@ -613,8 +616,10 @@ public void testContainerLogsForFinishedApps() throws Exception {
appAttemptId100, 1); appAttemptId100, 1);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1ForApp100, nodeId, fileName, user, rootLogDir, appId100,
("Hello." + containerId1ForApp100), true); Collections.singletonMap(containerId1ForApp100,
"Hello." + containerId1ForApp100),
nodeId, fileName, user, true);
r = resource(); r = resource();
response = r.path("ws").path("v1") response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs") .path("applicationhistory").path("containerlogs")
@ -767,7 +772,8 @@ public void testContainerLogsForRunningApps() throws Exception {
String content = "Hello." + containerId1000; String content = "Hello." + containerId1000;
NodeId nodeId = NodeId.newInstance("test host", 100); NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1000, nodeId, fileName, user, content, true); rootLogDir, appId, Collections.singletonMap(containerId1000, content),
nodeId, fileName, user, true);
r = resource(); r = resource();
ClientResponse response = r.path("ws").path("v1") ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containerlogs") .path("applicationhistory").path("containerlogs")
@ -806,7 +812,8 @@ public void testContainerLogsForRunningApps() throws Exception {
String content1 = "Hello." + containerId1; String content1 = "Hello." + containerId1;
NodeId nodeId1 = NodeId.fromString(NM_ID); NodeId nodeId1 = NodeId.fromString(NM_ID);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1, nodeId1, fileName, user, content1, true); rootLogDir, appId, Collections.singletonMap(containerId1, content1),
nodeId1, fileName, user, true);
response = r.path("ws").path("v1") response = r.path("ws").path("v1")
.path("applicationhistory").path("containers") .path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs").path(fileName) .path(containerId1.toString()).path("logs").path(fileName)
@ -866,7 +873,8 @@ public void testContainerLogsMetaForRunningApps() throws Exception {
String content = "Hello." + containerId1000; String content = "Hello." + containerId1000;
NodeId nodeId = NodeId.newInstance("test host", 100); NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1000, nodeId, fileName, user, content, true); rootLogDir, appId, Collections.singletonMap(containerId1000, content),
nodeId, fileName, user, true);
ClientResponse response = r.path("ws").path("v1") ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containers") .path("applicationhistory").path("containers")
.path(containerId1000.toString()).path("logs") .path(containerId1000.toString()).path("logs")
@ -932,7 +940,8 @@ public void testContainerLogsMetaForFinishedApps() throws Exception {
String content = "Hello." + containerId1; String content = "Hello." + containerId1;
NodeId nodeId = NodeId.newInstance("test host", 100); NodeId nodeId = NodeId.newInstance("test host", 100);
TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs,
rootLogDir, containerId1, nodeId, fileName, user, content, true); rootLogDir, appId, Collections.singletonMap(containerId1, content),
nodeId, fileName, user, true);
WebResource r = resource(); WebResource r = resource();
ClientResponse response = r.path("ws").path("v1") ClientResponse response = r.path("ws").path("v1")

View File

@ -141,6 +141,16 @@
<artifactId>curator-test</artifactId> <artifactId>curator-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey.jersey-test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
<version>1.19</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -24,9 +24,13 @@
import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.UniformInterfaceException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
@ -36,8 +40,12 @@
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.Response.Status;
import java.util.ArrayList;
import java.util.List;
/** /**
* Extracts aggregated logs and related information. * Extracts aggregated logs and related information.
@ -65,46 +73,172 @@ public String getNMWebAddressFromRM(String nodeId)
return LogWebServiceUtils.getNMWebAddressFromRM(getConf(), nodeId); return LogWebServiceUtils.getNMWebAddressFromRM(getConf(), nodeId);
} }
private static List<ContainerLogsInfo> convertToContainerLogsInfo(
List<ContainerLogMeta> containerLogMetas,
boolean emptyLocalContainerLogMeta) {
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
for (ContainerLogMeta meta : containerLogMetas) {
ContainerLogsInfo logInfo =
new ContainerLogsInfo(meta, ContainerLogAggregationType.AGGREGATED);
containersLogsInfo.add(logInfo);
if (emptyLocalContainerLogMeta) {
ContainerLogMeta emptyMeta =
new ContainerLogMeta(logInfo.getContainerId(),
logInfo.getNodeId() == null ? "N/A" : logInfo.getNodeId());
ContainerLogsInfo empty =
new ContainerLogsInfo(emptyMeta, ContainerLogAggregationType.LOCAL);
containersLogsInfo.add(empty);
}
}
return containersLogsInfo;
}
private static Response getContainerLogMeta(
WrappedLogMetaRequest request, boolean emptyLocalContainerLogMeta) {
try {
List<ContainerLogMeta> containerLogMeta = request.getContainerLogMetas();
if (containerLogMeta.isEmpty()) {
throw new NotFoundException("Can not get log meta for request.");
}
List<ContainerLogsInfo> containersLogsInfo = convertToContainerLogsInfo(
containerLogMeta, emptyLocalContainerLogMeta);
GenericEntity<List<ContainerLogsInfo>> meta =
new GenericEntity<List<ContainerLogsInfo>>(containersLogsInfo) {
};
Response.ResponseBuilder response = Response.ok(meta);
// Sending the X-Content-Type-Options response header with the value
// nosniff will prevent Internet Explorer from MIME-sniffing a response
// away from the declared content-type.
response.header("X-Content-Type-Options", "nosniff");
return response.build();
} catch (Exception ex) {
LOG.debug("Exception during request", ex);
throw new WebApplicationException(ex);
}
}
/**
* Validates whether the user has provided at least one query param for
* the request. Also validates that if multiple query params are provided,
* they do not contradict.
*/
private void validateUserInput(ApplicationId applicationId,
ApplicationAttemptId applicationAttemptId, ContainerId containerId) {
// At least one field should be set
if (applicationId == null && applicationAttemptId == null &&
containerId == null) {
throw new IllegalArgumentException("Should set application id, " +
"application attempt id or container id.");
}
// container id should belong to the app attempt and the app id,
// if provided
if (containerId != null) {
if (applicationAttemptId != null && !applicationAttemptId.equals(
containerId.getApplicationAttemptId())) {
throw new IllegalArgumentException(
String.format(
"Container %s does not belong to application attempt %s!",
containerId, applicationAttemptId));
}
if (applicationId != null && !applicationId.equals(
containerId.getApplicationAttemptId().getApplicationId())) {
throw new IllegalArgumentException(
String.format(
"Container %s does not belong to application %s!",
containerId, applicationId));
}
}
// app attempt id should match the app id, if provided
if (applicationAttemptId != null && applicationId != null &&
!applicationId.equals(applicationAttemptId.getApplicationId())) {
throw new IllegalArgumentException(
String.format(
"Application attempt %s does not belong to application %s!",
applicationAttemptId, applicationId));
}
}
public Response getLogsInfo(HttpServletRequest hsr, String appIdStr,
String appAttemptIdStr, String containerIdStr, String nmId,
boolean redirectedFromNode) {
ApplicationId appId = null;
if (appIdStr != null) {
try {
appId = ApplicationId.fromString(appIdStr);
} catch (IllegalArgumentException iae) {
throw new BadRequestException(iae);
}
}
ApplicationAttemptId appAttemptId = null;
if (appAttemptIdStr != null) {
try {
appAttemptId = ApplicationAttemptId.fromString(appAttemptIdStr);
} catch (IllegalArgumentException iae) {
throw new BadRequestException(iae);
}
}
ContainerId containerId = null;
if (containerIdStr != null) {
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException iae) {
throw new BadRequestException(iae);
}
}
validateUserInput(appId, appAttemptId, containerId);
WrappedLogMetaRequest.Builder logMetaRequestBuilder =
WrappedLogMetaRequest.builder()
.setApplicationId(appId)
.setApplicationAttemptId(appAttemptId)
.setContainerId(containerIdStr);
return getContainerLogsInfo(hsr, logMetaRequestBuilder, nmId,
redirectedFromNode, null);
}
/** /**
* Returns information about the logs for a specific container. * Returns information about the logs for a specific container.
* *
* @param req the {@link HttpServletRequest} * @param req the {@link HttpServletRequest}
* @param containerIdStr container id * @param builder builder instance for the log meta request
* @param nmId NodeManager id * @param nmId NodeManager id
* @param redirectedFromNode whether the request was redirected * @param redirectedFromNode whether the request was redirected
* @param clusterId the id of the cluster * @param clusterId the id of the cluster
* @return {@link Response} object containing information about the logs * @return {@link Response} object containing information about the logs
*/ */
public Response getContainerLogsInfo(HttpServletRequest req, public Response getContainerLogsInfo(HttpServletRequest req,
String containerIdStr, String nmId, boolean redirectedFromNode, WrappedLogMetaRequest.Builder builder,
String nmId, boolean redirectedFromNode,
String clusterId) { String clusterId) {
ContainerId containerId = null;
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (IllegalArgumentException e) {
throw new BadRequestException("invalid container id, " + containerIdStr);
}
ApplicationId appId = containerId.getApplicationAttemptId() builder.setFactory(factory);
.getApplicationId();
BasicAppInfo appInfo; BasicAppInfo appInfo;
try { try {
appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId); appInfo = appInfoProvider.getApp(req, builder.getAppId(), clusterId);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Could not obtain appInfo object from provider.", ex);
// directly find logs from HDFS. // directly find logs from HDFS.
return LogWebServiceUtils return getContainerLogMeta(builder.build(), false);
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
false);
} }
// if the application finishes, directly find logs // if the application finishes, directly find logs
// from HDFS. // from HDFS.
if (Apps.isApplicationFinalState(appInfo.getAppState())) { if (Apps.isApplicationFinalState(appInfo.getAppState())) {
return LogWebServiceUtils return getContainerLogMeta(builder.build(), false);
.getContainerLogMeta(factory, appId, null, null, containerIdStr,
false);
} }
if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) { if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
String appOwner = appInfo.getUser(); String appOwner = appInfo.getUser();
builder.setAppOwner(appOwner);
WrappedLogMetaRequest request = builder.build();
String nodeHttpAddress = null; String nodeHttpAddress = null;
if (nmId != null && !nmId.isEmpty()) { if (nmId != null && !nmId.isEmpty()) {
try { try {
@ -114,17 +248,17 @@ public Response getContainerLogsInfo(HttpServletRequest req,
} }
} }
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) { if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
try { if (request.getContainerId() != null) {
nodeHttpAddress = appInfoProvider.getNodeHttpAddress( try {
req, appId.toString(), nodeHttpAddress = appInfoProvider.getNodeHttpAddress(
containerId.getApplicationAttemptId().toString(), req, request.getAppId(), request.getAppAttemptId(),
containerId.toString(), clusterId); request.getContainerId().toString(), clusterId);
} catch (Exception ex) { } catch (Exception ex) {
// return log meta for the aggregated logs if exists. LOG.warn("Could not obtain node HTTP address from provider.", ex);
// It will also return empty log meta for the local logs. // return log meta for the aggregated logs if exists.
return LogWebServiceUtils // It will also return empty log meta for the local logs.
.getContainerLogMeta(factory, appId, appOwner, null, return getContainerLogMeta(request, true);
containerIdStr, true); }
} }
// make sure nodeHttpAddress is not null and not empty. Otherwise, // make sure nodeHttpAddress is not null and not empty. Otherwise,
// we would only get log meta for aggregated logs instead of // we would only get log meta for aggregated logs instead of
@ -135,11 +269,15 @@ public Response getContainerLogsInfo(HttpServletRequest req,
// It will also return empty log meta for the local logs. // It will also return empty log meta for the local logs.
// If this is the redirect request from NM, we should not // If this is the redirect request from NM, we should not
// re-direct the request back. Simply output the aggregated log meta. // re-direct the request back. Simply output the aggregated log meta.
return LogWebServiceUtils return getContainerLogMeta(request, true);
.getContainerLogMeta(factory, appId, appOwner, null,
containerIdStr, true);
} }
} }
ContainerId containerId = request.getContainerId();
if (containerId == null) {
throw new WebApplicationException(
new Exception("Could not redirect to node, as app attempt or " +
"application logs are requested."));
}
String uri = "/" + containerId.toString() + "/logs"; String uri = "/" + containerId.toString() + "/logs";
String resURI = JOINER.join( String resURI = JOINER.join(
LogWebServiceUtils.getAbsoluteNMWebAddress(getConf(), LogWebServiceUtils.getAbsoluteNMWebAddress(getConf(),
@ -192,7 +330,7 @@ public Response getLogFile(HttpServletRequest req, String containerIdStr,
try { try {
appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId); appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId);
} catch (Exception ex) { } catch (Exception ex) {
// directly find logs from HDFS. LOG.warn("Could not obtain appInfo object from provider.", ex);
return LogWebServiceUtils return LogWebServiceUtils
.sendStreamOutputResponse(factory, appId, null, null, containerIdStr, .sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
filename, format, length, false); filename, format, length, false);
@ -222,6 +360,7 @@ public Response getLogFile(HttpServletRequest req, String containerIdStr,
containerId.getApplicationAttemptId().toString(), containerId.getApplicationAttemptId().toString(),
containerId.toString(), clusterId); containerId.toString(), clusterId);
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Could not obtain node HTTP address from provider.", ex);
// output the aggregated logs // output the aggregated logs
return LogWebServiceUtils return LogWebServiceUtils
.sendStreamOutputResponse(factory, appId, appOwner, null, .sendStreamOutputResponse(factory, appId, appOwner, null,
@ -258,4 +397,16 @@ public Response getLogFile(HttpServletRequest req, String containerIdStr,
"The application is not at Running or Finished State."); "The application is not at Running or Finished State.");
} }
} }
public static WrappedLogMetaRequest.Builder createRequestFromContainerId(
String containerIdStr) {
WrappedLogMetaRequest.Builder logMetaRequestBuilder =
WrappedLogMetaRequest.builder();
try {
logMetaRequestBuilder.setContainerId(containerIdStr);
} catch (IllegalArgumentException e) {
throw new BadRequestException("Invalid container id: " + containerIdStr);
}
return logMetaRequestBuilder;
}
} }

View File

@ -151,7 +151,11 @@ public Response getContainerLogsInfo(@Context HttpServletRequest req,
@DefaultValue("false") boolean redirectedFromNode, @DefaultValue("false") boolean redirectedFromNode,
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) { @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
initForReadableEndpoints(res); initForReadableEndpoints(res);
return logServlet.getContainerLogsInfo(req, containerIdStr, nmId,
WrappedLogMetaRequest.Builder logMetaRequestBuilder =
LogServlet.createRequestFromContainerId(containerIdStr);
return logServlet.getContainerLogsInfo(req, logMetaRequestBuilder, nmId,
redirectedFromNode, clusterId); redirectedFromNode, clusterId);
} }

View File

@ -29,29 +29,25 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils; import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.GenericEntity;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.StreamingOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -60,56 +56,14 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public final class LogWebServiceUtils { public final class LogWebServiceUtils {
private static final Logger LOG =
LoggerFactory.getLogger(LogWebServiceUtils.class);
private LogWebServiceUtils() { private LogWebServiceUtils() {
} }
private static final Joiner DOT_JOINER = Joiner.on(". "); private static final Joiner DOT_JOINER = Joiner.on(". ");
public static Response getContainerLogMeta(
LogAggregationFileControllerFactory factory, ApplicationId appId,
String appOwner, final String nodeId, final String containerIdStr,
boolean emptyLocalContainerLogMeta) {
try {
ContainerLogsRequest request = new ContainerLogsRequest();
request.setAppId(appId);
request.setAppOwner(appOwner);
request.setContainerId(containerIdStr);
request.setNodeId(nodeId);
List<ContainerLogMeta> containerLogMeta =
factory.getFileControllerForRead(appId, appOwner)
.readAggregatedLogsMeta(request);
if (containerLogMeta.isEmpty()) {
throw new NotFoundException(
"Can not get log meta for container: " + containerIdStr);
}
List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
for (ContainerLogMeta meta : containerLogMeta) {
ContainerLogsInfo logInfo =
new ContainerLogsInfo(meta, ContainerLogAggregationType.AGGREGATED);
containersLogsInfo.add(logInfo);
}
if (emptyLocalContainerLogMeta) {
ContainerLogMeta emptyMeta =
new ContainerLogMeta(containerIdStr, "N/A");
ContainerLogsInfo empty =
new ContainerLogsInfo(emptyMeta, ContainerLogAggregationType.LOCAL);
containersLogsInfo.add(empty);
}
GenericEntity<List<ContainerLogsInfo>> meta =
new GenericEntity<List<ContainerLogsInfo>>(containersLogsInfo) {
};
Response.ResponseBuilder response = Response.ok(meta);
// Sending the X-Content-Type-Options response header with the value
// nosniff will prevent Internet Explorer from MIME-sniffing a response
// away from the declared content-type.
response.header("X-Content-Type-Options", "nosniff");
return response.build();
} catch (Exception ex) {
throw new WebApplicationException(ex);
}
}
public static Response sendStreamOutputResponse( public static Response sendStreamOutputResponse(
LogAggregationFileControllerFactory factory, ApplicationId appId, LogAggregationFileControllerFactory factory, ApplicationId appId,
String appOwner, String nodeId, String containerIdStr, String fileName, String appOwner, String nodeId, String containerIdStr, String fileName,
@ -131,6 +85,7 @@ public static Response sendStreamOutputResponse(
getStreamingOutput(factory, appId, appOwner, nodeId, containerIdStr, getStreamingOutput(factory, appId, appOwner, nodeId, containerIdStr,
fileName, bytes, printEmptyLocalContainerLog); fileName, bytes, printEmptyLocalContainerLog);
} catch (Exception ex) { } catch (Exception ex) {
LOG.debug("Exception", ex);
return createBadResponse(Response.Status.INTERNAL_SERVER_ERROR, return createBadResponse(Response.Status.INTERNAL_SERVER_ERROR,
ex.getMessage()); ex.getMessage());
} }

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webapp;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
/**
* WrappedLogMetaRequest is wrapping a log request initiated by the client.
* This wrapper class translates the request to a {@link ContainerLogsRequest}
* and calls #readAggregatedLogsMeta on the
* {@link LogAggregationFileController}.
* class.
*/
public class WrappedLogMetaRequest {
private final LogAggregationFileControllerFactory factory;
private final ApplicationId appId;
private final String appOwner;
private final ContainerId containerId;
private final String nodeId;
private final ApplicationAttemptId applicationAttemptId;
private WrappedLogMetaRequest(Builder builder) {
this.factory = builder.factory;
this.appId = builder.appId;
this.appOwner = builder.appOwner;
this.containerId = builder.containerId;
this.nodeId = builder.nodeId;
this.applicationAttemptId = builder.applicationAttemptId;
}
public static class Builder {
private LogAggregationFileControllerFactory factory;
private ApplicationId appId;
private String appOwner;
private ContainerId containerId;
private String nodeId;
private ApplicationAttemptId applicationAttemptId;
Builder() {
}
Builder setFactory(LogAggregationFileControllerFactory logFactory) {
this.factory = logFactory;
return this;
}
public Builder setApplicationId(ApplicationId applicationId) {
this.appId = applicationId;
return this;
}
Builder setNodeId(String nid) {
this.nodeId = nid;
return this;
}
public Builder setContainerId(@Nullable String containerIdStr) {
if (containerIdStr != null) {
this.containerId = ContainerId.fromString(containerIdStr);
}
return this;
}
Builder setAppOwner(String user) {
this.appOwner = user;
return this;
}
public Builder setApplicationAttemptId(ApplicationAttemptId appAttemptId) {
this.applicationAttemptId = appAttemptId;
return this;
}
String getAppId() {
return WrappedLogMetaRequest.getAppId(appId, applicationAttemptId,
containerId);
}
WrappedLogMetaRequest build() {
if (this.factory == null) {
throw new AssertionError("WrappedLogMetaRequest's builder should be " +
"given a LogAggregationFileControllerFactory as parameter.");
}
return new WrappedLogMetaRequest(this);
}
}
public static Builder builder() {
return new Builder();
}
private static String getAppId(ApplicationId appId,
ApplicationAttemptId applicationAttemptId, ContainerId containerId) {
if (appId == null) {
if (applicationAttemptId == null) {
return containerId.getApplicationAttemptId().getApplicationId()
.toString();
} else {
return applicationAttemptId.getApplicationId().toString();
}
}
return appId.toString();
}
public String getAppId() {
return getAppId(appId, applicationAttemptId, containerId);
}
public String getAppAttemptId() {
if (applicationAttemptId == null) {
if (containerId != null) {
return containerId.getApplicationAttemptId().toString();
} else {
return null;
}
} else {
return applicationAttemptId.toString();
}
}
public ContainerId getContainerId() {
return containerId;
}
/**
* Constructs a {@link ContainerLogsRequest} object, and obtains
* {@link ContainerLogsRequest} from the corresponding
* {@link LogAggregationFileController}.
*
* @return list of {@link ContainerLogMeta} objects that belong
* to the application, attempt or container
*/
public List<ContainerLogMeta> getContainerLogMetas() throws IOException {
ApplicationId applicationId = ApplicationId.fromString(getAppId());
ContainerLogsRequest request = new ContainerLogsRequest();
request.setAppId(applicationId);
request.setAppAttemptId(applicationAttemptId);
if (containerId != null) {
request.setContainerId(containerId.toString());
}
request.setAppOwner(appOwner);
request.setNodeId(nodeId);
return factory.getFileControllerForRead(applicationId, appOwner)
.readAggregatedLogsMeta(request);
}
}

View File

@ -29,6 +29,8 @@
public interface YarnWebServiceParams { public interface YarnWebServiceParams {
// the params used in container-log related web services // the params used in container-log related web services
String APP_ID = "appid";
String APPATTEMPT_ID = "appattemptid";
String CONTAINER_ID = "containerid"; String CONTAINER_ID = "containerid";
String CONTAINER_LOG_FILE_NAME = "filename"; String CONTAINER_LOG_FILE_NAME = "filename";
String RESPONSE_CONTENT_FORMAT = "format"; String RESPONSE_CONTENT_FORMAT = "format";

View File

@ -24,7 +24,6 @@
import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
@ -61,7 +60,7 @@ public class ContainerLogsInfo {
public ContainerLogsInfo() {} public ContainerLogsInfo() {}
public ContainerLogsInfo(ContainerLogMeta logMeta, public ContainerLogsInfo(ContainerLogMeta logMeta,
ContainerLogAggregationType logType) throws YarnException { ContainerLogAggregationType logType) {
this.containerLogsInfo = new ArrayList<ContainerLogFileInfo>( this.containerLogsInfo = new ArrayList<ContainerLogFileInfo>(
logMeta.getContainerLogMeta()); logMeta.getContainerLogMeta());
this.logType = logType.toString(); this.logType = logType.toString();

View File

@ -94,6 +94,7 @@
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -728,8 +729,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId,
String aggregatedLogMessage = "This is aggregated ;og."; String aggregatedLogMessage = "This is aggregated ;og.";
TestContainerLogsUtils.createContainerLogFileInRemoteFS( TestContainerLogsUtils.createContainerLogFileInRemoteFS(
nmContext.getConf(), FileSystem.get(nmContext.getConf()), nmContext.getConf(), FileSystem.get(nmContext.getConf()),
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(), tempLogDir.getAbsolutePath(), appId,
aggregatedLogFile, "user", aggregatedLogMessage, true); Collections.singletonMap(containerId, aggregatedLogMessage),
nmContext.getNodeId(), aggregatedLogFile, "user", true);
r1 = resource(); r1 = resource();
response = r1.path("ws").path("v1").path("node") response = r1.path("ws").path("v1").path("node")
.path("containers").path(containerIdStr) .path("containers").path(containerIdStr)
@ -757,8 +759,9 @@ private void testContainerLogs(WebResource r, ContainerId containerId,
// Test whether we could get aggregated log as well // Test whether we could get aggregated log as well
TestContainerLogsUtils.createContainerLogFileInRemoteFS( TestContainerLogsUtils.createContainerLogFileInRemoteFS(
nmContext.getConf(), FileSystem.get(nmContext.getConf()), nmContext.getConf(), FileSystem.get(nmContext.getConf()),
tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(), tempLogDir.getAbsolutePath(), appId,
filename, "user", aggregatedLogMessage, true); Collections.singletonMap(containerId, aggregatedLogMessage),
nmContext.getNodeId(), filename, "user", true);
response = r.path(filename) response = r.path(filename)
.accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
responseText = response.getEntity(String.class); responseText = response.getEntity(String.class);