From 6aec712c6c1e8ee84e323df9a461ea77554000e3 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Tue, 11 Feb 2020 09:18:44 +0100 Subject: [PATCH] YARN-10101. Support listing of aggregated logs for containers belonging to an application attempt. Contributed by Adam Antal --- .../mapreduce/v2/hs/webapp/HsWebServices.java | 40 +- .../v2/hs/webapp/TestHsWebServicesLogs.java | 780 ++++++++++++++++++ .../hadoop/yarn/client/cli/LogsCLI.java | 2 +- .../logaggregation/ContainerLogsRequest.java | 15 +- .../LogAggregationFileController.java | 16 + .../LogAggregationIndexedFileController.java | 12 +- .../tfile/LogAggregationTFileController.java | 12 +- .../TestContainerLogsUtils.java | 61 +- .../webapp/AHSWebServices.java | 7 +- .../webapp/TestAHSWebServices.java | 29 +- .../hadoop-yarn-server-common/pom.xml | 10 + .../hadoop/yarn/server/webapp/LogServlet.java | 215 ++++- .../yarn/server/webapp/LogWebService.java | 6 +- .../server/webapp/LogWebServiceUtils.java | 55 +- .../server/webapp/WrappedLogMetaRequest.java | 174 ++++ .../server/webapp/YarnWebServiceParams.java | 2 + .../server/webapp/dao/ContainerLogsInfo.java | 3 +- .../nodemanager/webapp/TestNMWebServices.java | 11 +- 18 files changed, 1309 insertions(+), 141 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesLogs.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/WrappedLogMetaRequest.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java index e3804e97b2d..ba93df92b1f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java @@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.webapp.WrappedLogMetaRequest; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.LogServlet; import org.apache.hadoop.yarn.server.webapp.WebServices; @@ -83,7 +84,7 @@ import com.google.inject.Inject; public class HsWebServices extends WebServices { private final HistoryContext ctx; private WebApp webapp; - private final LogServlet logServlet; + private LogServlet logServlet; private @Context HttpServletResponse response; @Context UriInfo uriInfo; @@ -422,18 +423,39 @@ public class HsWebServices extends WebServices { return new JobTaskAttemptCounterInfo(ta); } + @GET + @Path("/aggregatedlogs") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + @InterfaceAudience.Public + @InterfaceStability.Unstable + public Response getAggregatedLogsMeta(@Context HttpServletRequest hsr, + @QueryParam(YarnWebServiceParams.APP_ID) String appIdStr, + @QueryParam(YarnWebServiceParams.APPATTEMPT_ID) String appAttemptIdStr, + @QueryParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId, + @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) + @DefaultValue("false") boolean redirectedFromNode) { + init(); + return logServlet.getLogsInfo(hsr, appIdStr, appAttemptIdStr, + containerIdStr, nmId, redirectedFromNode); + } + @GET @Path("/containers/{containerid}/logs") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) @InterfaceAudience.Public @InterfaceStability.Unstable - public Response getLogs(@Context HttpServletRequest hsr, + public Response getContainerLogs(@Context HttpServletRequest hsr, @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, @QueryParam(YarnWebServiceParams.NM_ID) String nmId, @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) @DefaultValue("false") boolean redirectedFromNode) { init(); - return logServlet.getContainerLogsInfo(hsr, containerIdStr, nmId, + + WrappedLogMetaRequest.Builder logMetaRequestBuilder = + LogServlet.createRequestFromContainerId(containerIdStr); + + return logServlet.getContainerLogsInfo(hsr, logMetaRequestBuilder, nmId, redirectedFromNode, null); } @@ -442,7 +464,7 @@ public class HsWebServices extends WebServices { @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 }) @InterfaceAudience.Public @InterfaceStability.Unstable - public Response getLogs(@Context HttpServletRequest req, + public Response getContainerLogFile(@Context HttpServletRequest req, @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, @PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename, @@ -457,4 +479,14 @@ public class HsWebServices extends WebServices { return logServlet.getLogFile(req, containerIdStr, filename, format, size, nmId, redirectedFromNode, null); } + + @VisibleForTesting + LogServlet getLogServlet() { + return this.logServlet; + } + + @VisibleForTesting + void setLogServlet(LogServlet logServlet) { + this.logServlet = logServlet; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesLogs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesLogs.java new file mode 100644 index 00000000000..dccb5c87fe9 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesLogs.java @@ -0,0 +1,780 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.hs.webapp; + +import com.google.common.collect.Sets; +import com.google.inject.Guice; +import com.google.inject.servlet.ServletModule; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.GenericType; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; +import com.sun.jersey.test.framework.WebAppDescriptor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.hs.HistoryContext; +import org.apache.hadoop.mapreduce.v2.hs.MockHistoryContext; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils; +import org.apache.hadoop.yarn.server.webapp.LogServlet; +import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; +import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; +import org.apache.hadoop.yarn.webapp.BadRequestException; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.GuiceServletConfig; +import org.apache.hadoop.yarn.webapp.JerseyTestBase; +import org.apache.hadoop.yarn.webapp.WebApp; +import org.hamcrest.core.IsNull; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; + +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.StringContains.containsString; +import static org.hamcrest.core.IsNot.not; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * We created the following aggregated log structure, and test the log + * related API endpoints of {@link HsWebServices}. + * + * application_1 is finished + * attempt_1 + * container_1 finished on node_1 syslog + * container_2 finished on node_1 syslog + * container_3 finished on node_2 syslog + * attempt_2 + * container_1 finished on node_1 syslog + * + * application_2 is running + * attempt_1 + * container_1 finished on node_1 syslog + * attempt_2 + * container_1 finished on node_1 syslog + * container_2 running on node_1 syslog + * container_3 running on node_2 syslog (with some already aggregated log) + * + */ +public class TestHsWebServicesLogs extends JerseyTestBase { + + private static Configuration conf = new YarnConfiguration(); + private static FileSystem fs; + + private static final String LOCAL_ROOT_LOG_DIR = "target/LocalLogs"; + private static final String REMOTE_LOG_ROOT_DIR = "target/logs/"; + + private static final String USER = "fakeUser"; + private static final String FILE_NAME = "syslog"; + + private static final String NM_WEBADDRESS_1 = "test-nm-web-address-1:9999"; + private static final NodeId NM_ID_1 = NodeId.newInstance("fakeHost1", 9951); + private static final String NM_WEBADDRESS_2 = "test-nm-web-address-2:9999"; + private static final NodeId NM_ID_2 = NodeId.newInstance("fakeHost2", 9952); + + private static final ApplicationId APPID_1 = ApplicationId.newInstance(1, 1); + private static final ApplicationId APPID_2 = ApplicationId.newInstance(10, 2); + + private static final ApplicationAttemptId APP_ATTEMPT_1_1 = + ApplicationAttemptId.newInstance(APPID_1, 1); + private static final ApplicationAttemptId APP_ATTEMPT_1_2 = + ApplicationAttemptId.newInstance(APPID_1, 2); + private static final ApplicationAttemptId APP_ATTEMPT_2_1 = + ApplicationAttemptId.newInstance(APPID_2, 1); + private static final ApplicationAttemptId APP_ATTEMPT_2_2 = + ApplicationAttemptId.newInstance(APPID_2, 2); + + private static final ContainerId CONTAINER_1_1_1 = + ContainerId.newContainerId(APP_ATTEMPT_1_1, 1); + private static final ContainerId CONTAINER_1_1_2 = + ContainerId.newContainerId(APP_ATTEMPT_1_1, 2); + private static final ContainerId CONTAINER_1_1_3 = + ContainerId.newContainerId(APP_ATTEMPT_1_1, 3); + private static final ContainerId CONTAINER_1_2_1 = + ContainerId.newContainerId(APP_ATTEMPT_1_2, 1); + private static final ContainerId CONTAINER_2_1_1 = + ContainerId.newContainerId(APP_ATTEMPT_2_1, 1); + private static final ContainerId CONTAINER_2_2_1 = + ContainerId.newContainerId(APP_ATTEMPT_2_2, 1); + private static final ContainerId CONTAINER_2_2_2 = + ContainerId.newContainerId(APP_ATTEMPT_2_2, 2); + private static final ContainerId CONTAINER_2_2_3 = + ContainerId.newContainerId(APP_ATTEMPT_2_2, 3); + + static { + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT_DIR); + } + + private static class WebServletModule extends ServletModule { + @Override + protected void configureServlets() { + MockHistoryContext appContext = new MockHistoryContext(0, 1, 2, 1); + HsWebApp webApp = mock(HsWebApp.class); + when(webApp.name()).thenReturn("hsmockwebapp"); + + ApplicationClientProtocol mockProtocol = + mock(ApplicationClientProtocol.class); + try { + doAnswer(invocationOnMock -> { + GetApplicationReportRequest request = + (GetApplicationReportRequest) invocationOnMock.getArguments()[0]; + // returning the latest application attempt for each application + if (request.getApplicationId().equals(APPID_1)) { + return GetApplicationReportResponse.newInstance( + newApplicationReport(APPID_1, APP_ATTEMPT_1_2, false)); + } else if (request.getApplicationId().equals(APPID_2)) { + return GetApplicationReportResponse.newInstance( + newApplicationReport(APPID_2, APP_ATTEMPT_2_2, true)); + } + throw new RuntimeException( + "Unknown applicationId: " + request.getApplicationId()); + }).when(mockProtocol).getApplicationReport(any()); + + doAnswer(invocationOnMock -> { + GetContainerReportRequest request = + (GetContainerReportRequest) invocationOnMock.getArguments()[0]; + ContainerId cId = request.getContainerId(); + // for running containers assign node id and NM web address + if (cId.equals(CONTAINER_2_2_2)) { + return GetContainerReportResponse.newInstance( + newContainerReport(cId, NM_ID_1, NM_WEBADDRESS_1)); + } else if (cId.equals(CONTAINER_2_2_3)) { + return GetContainerReportResponse.newInstance( + newContainerReport(cId, NM_ID_2, NM_WEBADDRESS_2)); + } + // for finished application don't assign node id and NM web address + return GetContainerReportResponse.newInstance( + newContainerReport(cId, null, null)); + }).when(mockProtocol).getContainerReport(any()); + } catch (Exception ignore) { + fail("Failed to setup WebServletModule class"); + } + + HsWebServices hsWebServices = + new HsWebServices(appContext, conf, webApp, mockProtocol); + try { + LogServlet logServlet = hsWebServices.getLogServlet(); + logServlet = spy(logServlet); + doReturn(null).when(logServlet).getNMWebAddressFromRM(any()); + doReturn(NM_WEBADDRESS_1).when(logServlet).getNMWebAddressFromRM( + NM_ID_1.toString()); + doReturn(NM_WEBADDRESS_2).when(logServlet).getNMWebAddressFromRM( + NM_ID_2.toString()); + hsWebServices.setLogServlet(logServlet); + } catch (Exception ignore) { + fail("Failed to setup WebServletModule class"); + } + + bind(JAXBContextResolver.class); + bind(HsWebServices.class).toInstance(hsWebServices); + bind(GenericExceptionHandler.class); + bind(WebApp.class).toInstance(webApp); + bind(AppContext.class).toInstance(appContext); + bind(HistoryContext.class).toInstance(appContext); + bind(Configuration.class).toInstance(conf); + bind(ApplicationClientProtocol.class).toInstance(mockProtocol); + + serve("/*").with(GuiceContainer.class); + } + } + + @BeforeClass + public static void setupClass() throws Exception { + fs = FileSystem.get(conf); + createAggregatedFolders(); + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule())); + } + + @Before + public void setUp() { + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule())); + } + + /** + * Generating aggregated container logs for all containers + * except CONTAINER_2_2_2, which is still running. + * + * @throws Exception if failed to create aggregated log files + */ + private static void createAggregatedFolders() throws Exception { + Map contentsApp1 = new HashMap<>(); + contentsApp1.put(CONTAINER_1_1_1, "Hello-" + CONTAINER_1_1_1); + contentsApp1.put(CONTAINER_1_1_2, "Hello-" + CONTAINER_1_1_2); + contentsApp1.put(CONTAINER_1_2_1, "Hello-" + CONTAINER_1_2_1); + + TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, + LOCAL_ROOT_LOG_DIR, APPID_1, contentsApp1, NM_ID_1, FILE_NAME, + USER, false); + + TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, + LOCAL_ROOT_LOG_DIR, APPID_1, Collections.singletonMap(CONTAINER_1_1_3, + "Hello-" + CONTAINER_1_1_3), NM_ID_2, FILE_NAME, USER, false); + + Map contentsApp2 = new HashMap<>(); + contentsApp2.put(CONTAINER_2_1_1, "Hello-" + CONTAINER_2_1_1); + contentsApp2.put(CONTAINER_2_2_1, "Hello-" + CONTAINER_2_2_1); + + TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, + LOCAL_ROOT_LOG_DIR, APPID_2, contentsApp2, NM_ID_1, FILE_NAME, + USER, false); + + TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, + LOCAL_ROOT_LOG_DIR, APPID_2, Collections.singletonMap(CONTAINER_2_2_3, + "Hello-" + CONTAINER_2_2_3), NM_ID_2, FILE_NAME, USER, false); + } + + public TestHsWebServicesLogs() { + super(new WebAppDescriptor.Builder( + "org.apache.hadoop.mapreduce.v2.hs.webapp") + .contextListenerClass(GuiceServletConfig.class) + .filterClass(com.google.inject.servlet.GuiceFilter.class) + .contextPath("jersey-guice-filter").servletPath("/").build()); + } + + @AfterClass + public static void tearDownClass() throws Exception { + fs.delete(new Path(REMOTE_LOG_ROOT_DIR), true); + fs.delete(new Path(LOCAL_ROOT_LOG_DIR), true); + } + + @Test + public void testGetAggregatedLogsMetaForFinishedApp() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam(YarnWebServiceParams.APP_ID, APPID_1.toString()) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + List responseList = + response.getEntity(new GenericType>(){}); + Set expectedIdStrings = Sets.newHashSet( + CONTAINER_1_1_1.toString(), CONTAINER_1_1_2.toString(), + CONTAINER_1_1_3.toString(), CONTAINER_1_2_1.toString()); + + assertResponseList(responseList, expectedIdStrings, false); + + for (ContainerLogsInfo logsInfo : responseList) { + String cId = logsInfo.getContainerId(); + + assertThat(logsInfo.getLogType(), equalTo( + ContainerLogAggregationType.AGGREGATED.toString())); + + if (cId.equals(CONTAINER_1_1_3.toString())) { + assertThat(logsInfo.getNodeId(), equalTo(formatNodeId(NM_ID_2))); + } else { + assertThat(logsInfo.getNodeId(), equalTo(formatNodeId(NM_ID_1))); + } + + assertSimpleContainerLogFileInfo(logsInfo, cId); + } + } + + @Test + public void testGetAggregatedLogsMetaForRunningApp() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam(YarnWebServiceParams.APP_ID, APPID_2.toString()) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + List responseList = + response.getEntity(new GenericType>(){}); + Set expectedIdStrings = Sets.newHashSet( + CONTAINER_2_1_1.toString(), CONTAINER_2_2_1.toString(), + CONTAINER_2_2_3.toString()); + assertResponseList(responseList, expectedIdStrings, true); + + for (ContainerLogsInfo logsInfo : responseList) { + String cId = logsInfo.getContainerId(); + + if (cId.equals(CONTAINER_2_2_3.toString())) { + assertThat(logsInfo.getNodeId(), equalTo(formatNodeId(NM_ID_2))); + } else { + assertThat(logsInfo.getNodeId(), equalTo(formatNodeId(NM_ID_1))); + } + + if (logsInfo.getLogType().equals( + ContainerLogAggregationType.AGGREGATED.toString())) { + assertSimpleContainerLogFileInfo(logsInfo, cId); + } + } + } + + @Test + public void testGetAggregatedLogsMetaForFinishedAppAttempt() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam( + YarnWebServiceParams.APPATTEMPT_ID, APP_ATTEMPT_1_1.toString()) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + List responseList = + response.getEntity(new GenericType>(){}); + Set expectedIdStrings = Sets.newHashSet( + CONTAINER_1_1_1.toString(), CONTAINER_1_1_2.toString(), + CONTAINER_1_1_3.toString()); + assertResponseList(responseList, expectedIdStrings, false); + + for (ContainerLogsInfo logsInfo : responseList) { + String cId = logsInfo.getContainerId(); + + assertThat(logsInfo.getLogType(), equalTo( + ContainerLogAggregationType.AGGREGATED.toString())); + + if (cId.equals(CONTAINER_1_1_3.toString())) { + assertThat(logsInfo.getNodeId(), equalTo(formatNodeId(NM_ID_2))); + } else { + assertThat(logsInfo.getNodeId(), equalTo(formatNodeId(NM_ID_1))); + } + + assertSimpleContainerLogFileInfo(logsInfo, cId); + } + } + + @Test + public void testGetAggregatedLogsMetaForRunningAppAttempt() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam( + YarnWebServiceParams.APPATTEMPT_ID, APP_ATTEMPT_2_2.toString()) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + List responseList = + response.getEntity(new GenericType>(){}); + Set expectedIdStrings = Sets.newHashSet( + CONTAINER_2_2_1.toString(), CONTAINER_2_2_3.toString()); + assertResponseList(responseList, expectedIdStrings, true); + + for (ContainerLogsInfo logsInfo : responseList) { + String cId = logsInfo.getContainerId(); + + if (cId.equals(CONTAINER_2_2_3.toString())) { + assertThat(logsInfo.getNodeId(), equalTo(formatNodeId(NM_ID_2))); + } else { + assertThat(logsInfo.getNodeId(), equalTo(formatNodeId(NM_ID_1))); + } + + if (logsInfo.getLogType().equals( + ContainerLogAggregationType.AGGREGATED.toString())) { + assertSimpleContainerLogFileInfo(logsInfo, cId); + } + } + } + + @Test + public void testGetContainerLogsForFinishedContainer() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("containers") + .path(CONTAINER_1_1_2.toString()).path("logs") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + List responseText = + response.getEntity(new GenericType>(){}); + assertThat(responseText.size(), is(1)); + + ContainerLogsInfo logsInfo = responseText.get(0); + assertThat(logsInfo.getLogType(), equalTo( + ContainerLogAggregationType.AGGREGATED.toString())); + assertThat(logsInfo.getContainerId(), equalTo(CONTAINER_1_1_2.toString())); + + assertSimpleContainerLogFileInfo(logsInfo, CONTAINER_1_1_2.toString()); + } + + @Test + public void testGetContainerLogsForRunningContainer() throws Exception { + WebResource r = resource(); + URI requestURI = r.path("ws").path("v1") + .path("history").path("containers") + .path(CONTAINER_2_2_2.toString()) + .path("logs") + .getURI(); + String redirectURL = getRedirectURL(requestURI.toString()); + assertThat(redirectURL, IsNull.notNullValue()); + assertThat(redirectURL, containsString(NM_WEBADDRESS_1)); + assertThat(redirectURL, containsString("ws/v1/node/containers")); + assertThat(redirectURL, containsString(CONTAINER_2_2_2.toString())); + assertThat(redirectURL, containsString("/logs")); + + // If we specify NM id, we would re-direct the request + // to this NM's Web Address. + requestURI = r.path("ws").path("v1") + .path("history").path("containers") + .path(CONTAINER_2_2_2.toString()) + .path("logs") + .queryParam(YarnWebServiceParams.NM_ID, NM_ID_2.toString()) + .getURI(); + redirectURL = getRedirectURL(requestURI.toString()); + assertThat(redirectURL, IsNull.notNullValue()); + assertThat(redirectURL, containsString(NM_WEBADDRESS_2)); + assertThat(redirectURL, containsString("ws/v1/node/containers")); + assertThat(redirectURL, containsString(CONTAINER_2_2_2.toString())); + assertThat(redirectURL, containsString("/logs")); + + // If this is the redirect request, we would not re-direct the request + // back and get the aggregated log meta. + ClientResponse response = r.path("ws").path("v1") + .path("history").path("containers") + .path(CONTAINER_2_2_3.toString()) + .path("logs") + .queryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE, "true") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + List responseText = + response.getEntity(new GenericType>(){}); + assertThat(responseText.size(), is(2)); + + ContainerLogsInfo logsInfo1 = responseText.get(0); + ContainerLogsInfo logsInfo2 = responseText.get(1); + + assertThat(logsInfo1.getContainerId(), + equalTo(CONTAINER_2_2_3.toString())); + assertThat(logsInfo2.getContainerId(), + equalTo(CONTAINER_2_2_3.toString())); + + if (logsInfo1.getLogType().equals( + ContainerLogAggregationType.AGGREGATED.toString())) { + assertThat(logsInfo2.getLogType(), equalTo( + ContainerLogAggregationType.LOCAL.toString())); + + assertSimpleContainerLogFileInfo(logsInfo1, CONTAINER_2_2_3.toString()); + + // this information can be only obtained by the NM. + assertThat(logsInfo2.getContainerLogsInfo(), IsNull.nullValue()); + } else { + assertThat(logsInfo1.getLogType(), equalTo( + ContainerLogAggregationType.LOCAL.toString())); + assertThat(logsInfo2.getLogType(), equalTo( + ContainerLogAggregationType.AGGREGATED.toString())); + + // this information can be only obtained by the NM. + assertThat(logsInfo1.getContainerLogsInfo(), IsNull.nullValue()); + + assertSimpleContainerLogFileInfo(logsInfo2, CONTAINER_2_2_3.toString()); + } + } + + @Test + public void testGetContainerLogFileForFinishedContainer() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("containerlogs") + .path(CONTAINER_1_1_2.toString()) + .path(FILE_NAME) + .accept(MediaType.TEXT_PLAIN) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertThat(responseText, not(containsString("Can not find logs"))); + assertThat(responseText, not(containsString("Hello-" + CONTAINER_1_1_1))); + } + + @Test + public void testNoRedirectForFinishedContainer() throws Exception { + WebResource r = resource(); + URI requestURI = r.path("ws").path("v1") + .path("history").path("containerlogs") + .path(CONTAINER_2_2_1.toString()) + .path(FILE_NAME).getURI(); + String redirectURL = getRedirectURL(requestURI.toString()); + assertThat(redirectURL, IsNull.nullValue()); + } + + /** + * For local logs we can only check the redirect to the appropriate node. + */ + @Test + public void testGetContainerLogFileForRunningContainer() throws Exception { + WebResource r = resource(); + URI requestURI = r.path("ws").path("v1") + .path("history").path("containerlogs") + .path(CONTAINER_2_2_2.toString()) + .path(FILE_NAME).getURI(); + String redirectURL = getRedirectURL(requestURI.toString()); + assertThat(redirectURL, IsNull.notNullValue()); + assertThat(redirectURL, containsString(NM_WEBADDRESS_1)); + assertThat(redirectURL, containsString("ws/v1/node/containers")); + assertThat(redirectURL, containsString(CONTAINER_2_2_2.toString())); + assertThat(redirectURL, containsString("/logs/" + FILE_NAME)); + + // If we specify NM id, we would re-direct the request + // to this NM's Web Address. + requestURI = r.path("ws").path("v1") + .path("history").path("containerlogs") + .path(CONTAINER_2_2_2.toString()).path(FILE_NAME) + .queryParam(YarnWebServiceParams.NM_ID, NM_ID_2.toString()) + .getURI(); + redirectURL = getRedirectURL(requestURI.toString()); + assertThat(redirectURL, IsNull.notNullValue()); + assertThat(redirectURL, containsString(NM_WEBADDRESS_2)); + assertThat(redirectURL, containsString("ws/v1/node/containers")); + assertThat(redirectURL, containsString(CONTAINER_2_2_2.toString())); + assertThat(redirectURL, containsString("/logs/" + FILE_NAME)); + + // If this is the redirect request, we would not re-direct the request + // back and get the aggregated logs. + ClientResponse response = r.path("ws").path("v1") + .path("history").path("containerlogs") + .path(CONTAINER_2_2_3.toString()).path(FILE_NAME) + .queryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE, "true") + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertThat(redirectURL, IsNull.notNullValue()); + + assertThat(responseText, containsString("LogAggregationType: " + + ContainerLogAggregationType.AGGREGATED)); + assertThat(responseText, containsString("Hello-" + CONTAINER_2_2_3)); + } + + @Test + public void testNonExistingAppId() { + ApplicationId nonExistingApp = ApplicationId.newInstance(99, 99); + + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam(YarnWebServiceParams.APP_ID, nonExistingApp.toString()) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertThat(responseText, containsString( + WebApplicationException.class.getSimpleName())); + assertThat(responseText, containsString("Can not find")); + } + + @Test + public void testBadAppId() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam(YarnWebServiceParams.APP_ID, "some text") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertThat(responseText, containsString( + BadRequestException.class.getSimpleName())); + assertThat(responseText, containsString("Invalid ApplicationId")); + } + + @Test + public void testNonExistingAppAttemptId() { + ApplicationId nonExistingApp = ApplicationId.newInstance(99, 99); + ApplicationAttemptId nonExistingAppAttemptId = + ApplicationAttemptId.newInstance(nonExistingApp, 1); + + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam(YarnWebServiceParams.APPATTEMPT_ID, + nonExistingAppAttemptId.toString()) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertThat(responseText, containsString( + WebApplicationException.class.getSimpleName())); + assertThat(responseText, containsString("Can not find")); + } + + @Test + public void testBadAppAttemptId() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam(YarnWebServiceParams.APPATTEMPT_ID, "some text") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertThat(responseText, containsString( + BadRequestException.class.getSimpleName())); + assertThat(responseText, containsString("Invalid AppAttemptId")); + } + + @Test + public void testNonExistingContainerId() { + ApplicationId nonExistingApp = ApplicationId.newInstance(99, 99); + ApplicationAttemptId nonExistingAppAttemptId = + ApplicationAttemptId.newInstance(nonExistingApp, 1); + ContainerId nonExistingContainerId = + ContainerId.newContainerId(nonExistingAppAttemptId, 1); + + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam(YarnWebServiceParams.CONTAINER_ID, + nonExistingContainerId.toString()) + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertThat(responseText, containsString( + WebApplicationException.class.getSimpleName())); + assertThat(responseText, containsString("Can not find")); + } + + @Test + public void testBadContainerId() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("aggregatedlogs") + .queryParam(YarnWebServiceParams.CONTAINER_ID, "some text") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + String responseText = response.getEntity(String.class); + assertThat(responseText, containsString( + BadRequestException.class.getSimpleName())); + assertThat(responseText, containsString("Invalid ContainerId")); + } + + @Test + public void testNonExistingContainerMeta() { + ApplicationId nonExistingApp = ApplicationId.newInstance(99, 99); + ApplicationAttemptId nonExistingAppAttemptId = + ApplicationAttemptId.newInstance(nonExistingApp, 1); + ContainerId nonExistingContainerId = + ContainerId.newContainerId(nonExistingAppAttemptId, 1); + + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("containers") + .path(nonExistingContainerId.toString()).path("logs") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + String responseText = response.getEntity(String.class); + assertThat(responseText, containsString( + WebApplicationException.class.getSimpleName())); + assertThat(responseText, containsString("Can not find")); + } + + @Test + public void testBadContainerForMeta() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1") + .path("history").path("containers") + .path("some text").path("logs") + .accept(MediaType.APPLICATION_JSON) + .get(ClientResponse.class); + + String responseText = response.getEntity(String.class); + assertThat(responseText, containsString( + BadRequestException.class.getSimpleName())); + assertThat(responseText, containsString("Invalid container id")); + } + + private static void assertSimpleContainerLogFileInfo( + ContainerLogsInfo logsInfo, String cId) { + assertThat(logsInfo.getContainerLogsInfo(), IsNull.notNullValue()); + assertThat(logsInfo.getContainerLogsInfo().size(), is(1)); + ContainerLogFileInfo fileInfo = logsInfo.getContainerLogsInfo().get(0); + assertThat(fileInfo.getFileName(), equalTo(FILE_NAME)); + assertThat(fileInfo.getFileSize(), is( + String.valueOf(("Hello-" + cId).length()))); + } + + private static void assertResponseList(List responseList, + Set expectedIdStrings, boolean running) { + Set actualStrings = + responseList.stream() + .map(ContainerLogsInfo::getContainerId) + .collect(Collectors.toSet()); + assertThat(actualStrings, equalTo(expectedIdStrings)); + + int expectedSize = expectedIdStrings.size(); + assertThat(responseList.size(), is( + running ? expectedSize * 2 : expectedSize)); + } + + private static String formatNodeId(NodeId nodeId) { + return nodeId.toString().replace(":", "_"); + } + + private static ApplicationReport newApplicationReport(ApplicationId appId, + ApplicationAttemptId appAttemptId, boolean running) { + return ApplicationReport.newInstance(appId, appAttemptId, USER, + "fakeQueue", "fakeApplicationName", "localhost", 0, null, + running ? YarnApplicationState.RUNNING : YarnApplicationState.FINISHED, + "fake an application report", "", 1000L, 1000L, 1000L, null, null, + "", 50f, "fakeApplicationType", null); + } + + private static ContainerReport newContainerReport(ContainerId containerId, + NodeId nodeId, String nmWebAddress) { + return ContainerReport.newInstance(containerId, null, nodeId, + Priority.UNDEFINED, 0, 0, null, null, 0, null, nmWebAddress); + } + + private static String getRedirectURL(String url) throws Exception { + HttpURLConnection conn = (HttpURLConnection) new URL(url) + .openConnection(); + // do not automatically follow the redirection + // otherwise we get too many redirection exceptions + conn.setInstanceFollowRedirects(false); + if (conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) { + return conn.getHeaderField("Location"); + } + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 4ada007f9cd..1f833d5ed31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -354,7 +354,7 @@ public class LogsCLI extends Configured implements Tool { } - ContainerLogsRequest request = new ContainerLogsRequest(appId, + ContainerLogsRequest request = new ContainerLogsRequest(appId, null, Apps.isApplicationFinalState(appState), appOwner, nodeAddress, null, containerIdStr, localDir, logs, bytes, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRequest.java index 30aeb6cc78b..da393cdce4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRequest.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.logaggregation; import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerState; public class ContainerLogsRequest { private ApplicationId appId; + private ApplicationAttemptId appAttemptId; private String containerId; private String nodeId; private String nodeHttpAddress; @@ -38,6 +41,7 @@ public class ContainerLogsRequest { public ContainerLogsRequest(ContainerLogsRequest request) { this.setAppId(request.getAppId()); + this.setAppAttemptId(request.getAppAttemptId()); this.setAppFinished(request.isAppFinished()); this.setAppOwner(request.getAppOwner()); this.setNodeId(request.getNodeId()); @@ -50,10 +54,11 @@ public class ContainerLogsRequest { } public ContainerLogsRequest(ApplicationId applicationId, - boolean isAppFinished, String owner, + ApplicationAttemptId appAttemptId, boolean isAppFinished, String owner, String address, String httpAddress, String container, String localDir, Set logs, long bytes, ContainerState containerState) { this.setAppId(applicationId); + this.setAppAttemptId(appAttemptId); this.setAppFinished(isAppFinished); this.setAppOwner(owner); this.setNodeId(address); @@ -73,6 +78,14 @@ public class ContainerLogsRequest { this.appId = appId; } + public ApplicationAttemptId getAppAttemptId() { + return this.appAttemptId; + } + + public void setAppAttemptId(ApplicationAttemptId appAttemptId) { + this.appAttemptId = appAttemptId; + } + public String getContainerId() { return containerId; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 6f1162ad892..7db5a269fe2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -47,7 +47,9 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -583,4 +585,18 @@ public abstract class LogAggregationFileController { public boolean isFsSupportsChmod() { return fsSupportsChmod; } + + protected boolean belongsToAppAttempt(ApplicationAttemptId appAttemptId, + String containerIdStr) { + ContainerId containerId = null; + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException exc) { + LOG.warn("Could not parse container id from aggregated log.", exc); + } + if (containerId != null && containerId.getApplicationAttemptId() != null) { + return containerId.getApplicationAttemptId().equals(appAttemptId); + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 2b6a6101bae..00739616d79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -67,6 +67,7 @@ import org.apache.hadoop.io.file.tfile.SimpleBufferedOutputStream; import org.apache.hadoop.io.file.tfile.Compression.Algorithm; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -590,8 +591,9 @@ public class LogAggregationIndexedFileController String nodeId = logRequest.getNodeId(); ApplicationId appId = logRequest.getAppId(); String appOwner = logRequest.getAppOwner(); - boolean getAllContainers = (containerIdStr == null || - containerIdStr.isEmpty()); + ApplicationAttemptId appAttemptId = logRequest.getAppAttemptId(); + boolean getAllContainers = ((containerIdStr == null || + containerIdStr.isEmpty()) && appAttemptId != null); String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null : LogAggregationUtils.getNodeString(nodeId); RemoteIterator nodeFiles = LogAggregationUtils @@ -635,8 +637,12 @@ public class LogAggregationIndexedFileController if (getAllContainers) { for (Entry> log : logMeta .getLogMetas().entrySet()) { + String currentContainerIdStr = log.getKey(); + if (!belongsToAppAttempt(appAttemptId, currentContainerIdStr)) { + continue; + } ContainerLogMeta meta = new ContainerLogMeta( - log.getKey().toString(), curNodeId); + log.getKey(), curNodeId); for (IndexedFileLogMeta aMeta : log.getValue()) { meta.addLogMeta(aMeta.getFileName(), Long.toString( aMeta.getFileSize()), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index 8d5338ac9ce..537bf8a46f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -27,6 +27,8 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.math3.util.Pair; @@ -264,7 +266,10 @@ public class LogAggregationTFileController String nodeId = logRequest.getNodeId(); ApplicationId appId = logRequest.getAppId(); String appOwner = logRequest.getAppOwner(); - boolean getAllContainers = (containerIdStr == null); + ApplicationAttemptId appAttemptId = logRequest.getAppAttemptId(); + boolean getAllContainers = (containerIdStr == null && + appAttemptId == null); + boolean getOnlyOneContainer = containerIdStr != null; String nodeIdStr = (nodeId == null) ? null : LogAggregationUtils.getNodeString(nodeId); RemoteIterator nodeFiles = LogAggregationUtils @@ -297,7 +302,8 @@ public class LogAggregationTFileController LogKey key = new LogKey(); valueStream = reader.next(key); while (valueStream != null) { - if (getAllContainers || (key.toString().equals(containerIdStr))) { + if (getAllContainers || (key.toString().equals(containerIdStr)) || + belongsToAppAttempt(appAttemptId, key.toString())) { ContainerLogMeta containerLogMeta = new ContainerLogMeta( key.toString(), thisNodeFile.getPath().getName()); while (true) { @@ -314,7 +320,7 @@ public class LogAggregationTFileController } } containersLogMeta.add(containerLogMeta); - if (!getAllContainers) { + if (getOnlyOneContainer) { break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index 8c0b147f7fe..f0545805106 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -52,23 +52,21 @@ public final class TestContainerLogsUtils { * @param conf the configuration * @param fs the FileSystem * @param rootLogDir the root log directory - * @param containerId the containerId + * @param appId the application id + * @param containerToContent mapping between container id and its content * @param nodeId the nodeId * @param fileName the log file name * @param user the application user - * @param content the log context - * @param deletePreviousRemoteLogDir whether to delete remote log dir. + * @param deleteRemoteLogDir whether to delete remote log dir. * @throws IOException if we can not create log files locally * or we can not upload container logs into RemoteFS. */ public static void createContainerLogFileInRemoteFS(Configuration conf, - FileSystem fs, String rootLogDir, ContainerId containerId, NodeId nodeId, - String fileName, String user, String content, - boolean deleteRemoteLogDir) throws Exception { + FileSystem fs, String rootLogDir, ApplicationId appId, + Map containerToContent, NodeId nodeId, + String fileName, String user, boolean deleteRemoteLogDir) + throws Exception { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); - //prepare the logs for remote directory - ApplicationId appId = containerId.getApplicationAttemptId() - .getApplicationId(); // create local logs List rootLogDirList = new ArrayList(); rootLogDirList.add(rootLogDir); @@ -83,8 +81,7 @@ public final class TestContainerLogsUtils { } assertTrue(fs.mkdirs(appLogsDir)); - createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName, - content); + createContainerLogInLocalDir(appLogsDir, containerToContent, fs, fileName); // upload container logs to remote log dir LogAggregationFileControllerFactory factory = new LogAggregationFileControllerFactory(conf); @@ -96,27 +93,33 @@ public final class TestContainerLogsUtils { fs.delete(path, true); } assertTrue(fs.mkdirs(path)); - uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId, - containerId, path, fs); + uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId, appId, + containerToContent.keySet(), path); } private static void createContainerLogInLocalDir(Path appLogsDir, - ContainerId containerId, FileSystem fs, String fileName, String content) - throws IOException{ - Path containerLogsDir = new Path(appLogsDir, containerId.toString()); - if (fs.exists(containerLogsDir)) { - fs.delete(containerLogsDir, true); + Map containerToContent, FileSystem fs, + String fileName) throws IOException { + for (Map.Entry containerAndContent : + containerToContent.entrySet()) { + ContainerId containerId = containerAndContent.getKey(); + String content = containerAndContent.getValue(); + Path containerLogsDir = new Path(appLogsDir, containerId.toString()); + if (fs.exists(containerLogsDir)) { + fs.delete(containerLogsDir, true); + } + assertTrue(fs.mkdirs(containerLogsDir)); + Writer writer = + new FileWriter(new File(containerLogsDir.toString(), fileName)); + writer.write(content); + writer.close(); } - assertTrue(fs.mkdirs(containerLogsDir)); - Writer writer = - new FileWriter(new File(containerLogsDir.toString(), fileName)); - writer.write(content); - writer.close(); } private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi, Configuration configuration, List rootLogDirs, NodeId nodeId, - ContainerId containerId, Path appDir, FileSystem fs) throws Exception { + ApplicationId appId, Iterable containerIds, Path appDir) + throws Exception { Path path = new Path(appDir, LogAggregationUtils.getNodeString(nodeId)); LogAggregationFileControllerFactory factory @@ -126,16 +129,16 @@ public final class TestContainerLogsUtils { try { Map appAcls = new HashMap<>(); appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName()); - ApplicationId appId = containerId.getApplicationAttemptId() - .getApplicationId(); LogAggregationFileControllerContext context = new LogAggregationFileControllerContext( path, path, true, 1000, appId, appAcls, nodeId, ugi); fileController.initializeWriter(context); - fileController.write(new AggregatedLogFormat.LogKey(containerId), - new AggregatedLogFormat.LogValue(rootLogDirs, containerId, - ugi.getShortUserName())); + for (ContainerId containerId : containerIds) { + fileController.write(new AggregatedLogFormat.LogKey(containerId), + new AggregatedLogFormat.LogValue(rootLogDirs, containerId, + ugi.getShortUserName())); + } } finally { fileController.closeWriter(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index 607b88becb6..970c5d8b45a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.server.webapp.LogServlet; import org.apache.hadoop.yarn.server.webapp.WebServices; +import org.apache.hadoop.yarn.server.webapp.WrappedLogMetaRequest; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo; @@ -223,7 +224,11 @@ public class AHSWebServices extends WebServices { @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) @DefaultValue("false") boolean redirected_from_node) { initForReadableEndpoints(res); - return logServlet.getContainerLogsInfo(req, containerIdStr, nmId, + + WrappedLogMetaRequest.Builder logMetaRequestBuilder = + LogServlet.createRequestFromContainerId(containerIdStr); + + return logServlet.getContainerLogsInfo(req, logMetaRequestBuilder, nmId, redirected_from_node, null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java index 91b5bf0c807..495e87a95f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -32,6 +32,7 @@ import java.net.URI; import java.net.URL; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -553,11 +554,13 @@ public class TestAHSWebServices extends JerseyTestBase { ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100); TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, - rootLogDir, containerId1, nodeId, fileName, user, - ("Hello." + containerId1), true); + rootLogDir, appId, Collections.singletonMap(containerId1, + "Hello." + containerId1), + nodeId, fileName, user, true); TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, - rootLogDir, containerId100, nodeId2, fileName, user, - ("Hello." + containerId100), false); + rootLogDir, appId, Collections.singletonMap(containerId100, + "Hello." + containerId100), + nodeId2, fileName, user, false); // test whether we can find container log from remote diretory if // the containerInfo for this container could be fetched from AHS. WebResource r = resource(); @@ -612,8 +615,10 @@ public class TestAHSWebServices extends JerseyTestBase { appAttemptId100, 1); TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, - rootLogDir, containerId1ForApp100, nodeId, fileName, user, - ("Hello." + containerId1ForApp100), true); + rootLogDir, appId100, + Collections.singletonMap(containerId1ForApp100, + "Hello." + containerId1ForApp100), + nodeId, fileName, user, true); r = resource(); response = r.path("ws").path("v1") .path("applicationhistory").path("containerlogs") @@ -766,7 +771,8 @@ public class TestAHSWebServices extends JerseyTestBase { String content = "Hello." + containerId1000; NodeId nodeId = NodeId.newInstance("test host", 100); TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, - rootLogDir, containerId1000, nodeId, fileName, user, content, true); + rootLogDir, appId, Collections.singletonMap(containerId1000, content), + nodeId, fileName, user, true); r = resource(); ClientResponse response = r.path("ws").path("v1") .path("applicationhistory").path("containerlogs") @@ -805,7 +811,8 @@ public class TestAHSWebServices extends JerseyTestBase { String content1 = "Hello." + containerId1; NodeId nodeId1 = NodeId.fromString(NM_ID); TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, - rootLogDir, containerId1, nodeId1, fileName, user, content1, true); + rootLogDir, appId, Collections.singletonMap(containerId1, content1), + nodeId1, fileName, user, true); response = r.path("ws").path("v1") .path("applicationhistory").path("containers") .path(containerId1.toString()).path("logs").path(fileName) @@ -865,7 +872,8 @@ public class TestAHSWebServices extends JerseyTestBase { String content = "Hello." + containerId1000; NodeId nodeId = NodeId.newInstance("test host", 100); TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, - rootLogDir, containerId1000, nodeId, fileName, user, content, true); + rootLogDir, appId, Collections.singletonMap(containerId1000, content), + nodeId, fileName, user, true); ClientResponse response = r.path("ws").path("v1") .path("applicationhistory").path("containers") .path(containerId1000.toString()).path("logs") @@ -931,7 +939,8 @@ public class TestAHSWebServices extends JerseyTestBase { String content = "Hello." + containerId1; NodeId nodeId = NodeId.newInstance("test host", 100); TestContainerLogsUtils.createContainerLogFileInRemoteFS(conf, fs, - rootLogDir, containerId1, nodeId, fileName, user, content, true); + rootLogDir, appId, Collections.singletonMap(containerId1, content), + nodeId, fileName, user, true); WebResource r = resource(); ClientResponse response = r.path("ws").path("v1") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 39ec605096d..af0240f7713 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -140,6 +140,16 @@ curator-test test + + com.sun.jersey.jersey-test-framework + jersey-test-framework-core + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-core + 1.19 + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java index 39e0ae308b5..d889344d8a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java @@ -24,9 +24,13 @@ import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.UniformInterfaceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -36,8 +40,12 @@ import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.GenericEntity; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import java.util.ArrayList; +import java.util.List; /** * Extracts aggregated logs and related information. @@ -65,46 +73,172 @@ public class LogServlet extends Configured { return LogWebServiceUtils.getNMWebAddressFromRM(getConf(), nodeId); } + private static List convertToContainerLogsInfo( + List containerLogMetas, + boolean emptyLocalContainerLogMeta) { + List containersLogsInfo = new ArrayList<>(); + for (ContainerLogMeta meta : containerLogMetas) { + ContainerLogsInfo logInfo = + new ContainerLogsInfo(meta, ContainerLogAggregationType.AGGREGATED); + containersLogsInfo.add(logInfo); + + if (emptyLocalContainerLogMeta) { + ContainerLogMeta emptyMeta = + new ContainerLogMeta(logInfo.getContainerId(), + logInfo.getNodeId() == null ? "N/A" : logInfo.getNodeId()); + ContainerLogsInfo empty = + new ContainerLogsInfo(emptyMeta, ContainerLogAggregationType.LOCAL); + containersLogsInfo.add(empty); + } + } + return containersLogsInfo; + } + + private static Response getContainerLogMeta( + WrappedLogMetaRequest request, boolean emptyLocalContainerLogMeta) { + try { + List containerLogMeta = request.getContainerLogMetas(); + if (containerLogMeta.isEmpty()) { + throw new NotFoundException("Can not get log meta for request."); + } + List containersLogsInfo = convertToContainerLogsInfo( + containerLogMeta, emptyLocalContainerLogMeta); + + GenericEntity> meta = + new GenericEntity>(containersLogsInfo) { + }; + Response.ResponseBuilder response = Response.ok(meta); + // Sending the X-Content-Type-Options response header with the value + // nosniff will prevent Internet Explorer from MIME-sniffing a response + // away from the declared content-type. + response.header("X-Content-Type-Options", "nosniff"); + return response.build(); + } catch (Exception ex) { + LOG.debug("Exception during request", ex); + throw new WebApplicationException(ex); + } + } + + /** + * Validates whether the user has provided at least one query param for + * the request. Also validates that if multiple query params are provided, + * they do not contradict. + */ + private void validateUserInput(ApplicationId applicationId, + ApplicationAttemptId applicationAttemptId, ContainerId containerId) { + // At least one field should be set + if (applicationId == null && applicationAttemptId == null && + containerId == null) { + throw new IllegalArgumentException("Should set application id, " + + "application attempt id or container id."); + } + + // container id should belong to the app attempt and the app id, + // if provided + if (containerId != null) { + if (applicationAttemptId != null && !applicationAttemptId.equals( + containerId.getApplicationAttemptId())) { + throw new IllegalArgumentException( + String.format( + "Container %s does not belong to application attempt %s!", + containerId, applicationAttemptId)); + } + if (applicationId != null && !applicationId.equals( + containerId.getApplicationAttemptId().getApplicationId())) { + throw new IllegalArgumentException( + String.format( + "Container %s does not belong to application %s!", + containerId, applicationId)); + } + } + + // app attempt id should match the app id, if provided + if (applicationAttemptId != null && applicationId != null && + !applicationId.equals(applicationAttemptId.getApplicationId())) { + throw new IllegalArgumentException( + String.format( + "Application attempt %s does not belong to application %s!", + applicationAttemptId, applicationId)); + } + } + + public Response getLogsInfo(HttpServletRequest hsr, String appIdStr, + String appAttemptIdStr, String containerIdStr, String nmId, + boolean redirectedFromNode) { + ApplicationId appId = null; + if (appIdStr != null) { + try { + appId = ApplicationId.fromString(appIdStr); + } catch (IllegalArgumentException iae) { + throw new BadRequestException(iae); + } + } + + ApplicationAttemptId appAttemptId = null; + if (appAttemptIdStr != null) { + try { + appAttemptId = ApplicationAttemptId.fromString(appAttemptIdStr); + } catch (IllegalArgumentException iae) { + throw new BadRequestException(iae); + } + } + + ContainerId containerId = null; + if (containerIdStr != null) { + try { + containerId = ContainerId.fromString(containerIdStr); + } catch (IllegalArgumentException iae) { + throw new BadRequestException(iae); + } + } + + validateUserInput(appId, appAttemptId, containerId); + + WrappedLogMetaRequest.Builder logMetaRequestBuilder = + WrappedLogMetaRequest.builder() + .setApplicationId(appId) + .setApplicationAttemptId(appAttemptId) + .setContainerId(containerIdStr); + + return getContainerLogsInfo(hsr, logMetaRequestBuilder, nmId, + redirectedFromNode, null); + } + /** * Returns information about the logs for a specific container. * * @param req the {@link HttpServletRequest} - * @param containerIdStr container id + * @param builder builder instance for the log meta request * @param nmId NodeManager id * @param redirectedFromNode whether the request was redirected * @param clusterId the id of the cluster * @return {@link Response} object containing information about the logs */ public Response getContainerLogsInfo(HttpServletRequest req, - String containerIdStr, String nmId, boolean redirectedFromNode, + WrappedLogMetaRequest.Builder builder, + String nmId, boolean redirectedFromNode, String clusterId) { - ContainerId containerId = null; - try { - containerId = ContainerId.fromString(containerIdStr); - } catch (IllegalArgumentException e) { - throw new BadRequestException("invalid container id, " + containerIdStr); - } - ApplicationId appId = containerId.getApplicationAttemptId() - .getApplicationId(); + builder.setFactory(factory); + BasicAppInfo appInfo; try { - appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId); + appInfo = appInfoProvider.getApp(req, builder.getAppId(), clusterId); } catch (Exception ex) { + LOG.warn("Could not obtain appInfo object from provider.", ex); // directly find logs from HDFS. - return LogWebServiceUtils - .getContainerLogMeta(factory, appId, null, null, containerIdStr, - false); + return getContainerLogMeta(builder.build(), false); } // if the application finishes, directly find logs // from HDFS. if (Apps.isApplicationFinalState(appInfo.getAppState())) { - return LogWebServiceUtils - .getContainerLogMeta(factory, appId, null, null, containerIdStr, - false); + return getContainerLogMeta(builder.build(), false); } if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) { String appOwner = appInfo.getUser(); + builder.setAppOwner(appOwner); + WrappedLogMetaRequest request = builder.build(); + String nodeHttpAddress = null; if (nmId != null && !nmId.isEmpty()) { try { @@ -114,17 +248,17 @@ public class LogServlet extends Configured { } } if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) { - try { - nodeHttpAddress = appInfoProvider.getNodeHttpAddress( - req, appId.toString(), - containerId.getApplicationAttemptId().toString(), - containerId.toString(), clusterId); - } catch (Exception ex) { - // return log meta for the aggregated logs if exists. - // It will also return empty log meta for the local logs. - return LogWebServiceUtils - .getContainerLogMeta(factory, appId, appOwner, null, - containerIdStr, true); + if (request.getContainerId() != null) { + try { + nodeHttpAddress = appInfoProvider.getNodeHttpAddress( + req, request.getAppId(), request.getAppAttemptId(), + request.getContainerId().toString(), clusterId); + } catch (Exception ex) { + LOG.warn("Could not obtain node HTTP address from provider.", ex); + // return log meta for the aggregated logs if exists. + // It will also return empty log meta for the local logs. + return getContainerLogMeta(request, true); + } } // make sure nodeHttpAddress is not null and not empty. Otherwise, // we would only get log meta for aggregated logs instead of @@ -135,11 +269,15 @@ public class LogServlet extends Configured { // It will also return empty log meta for the local logs. // If this is the redirect request from NM, we should not // re-direct the request back. Simply output the aggregated log meta. - return LogWebServiceUtils - .getContainerLogMeta(factory, appId, appOwner, null, - containerIdStr, true); + return getContainerLogMeta(request, true); } } + ContainerId containerId = request.getContainerId(); + if (containerId == null) { + throw new WebApplicationException( + new Exception("Could not redirect to node, as app attempt or " + + "application logs are requested.")); + } String uri = "/" + containerId.toString() + "/logs"; String resURI = JOINER.join( LogWebServiceUtils.getAbsoluteNMWebAddress(getConf(), @@ -192,7 +330,7 @@ public class LogServlet extends Configured { try { appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId); } catch (Exception ex) { - // directly find logs from HDFS. + LOG.warn("Could not obtain appInfo object from provider.", ex); return LogWebServiceUtils .sendStreamOutputResponse(factory, appId, null, null, containerIdStr, filename, format, length, false); @@ -222,6 +360,7 @@ public class LogServlet extends Configured { containerId.getApplicationAttemptId().toString(), containerId.toString(), clusterId); } catch (Exception ex) { + LOG.warn("Could not obtain node HTTP address from provider.", ex); // output the aggregated logs return LogWebServiceUtils .sendStreamOutputResponse(factory, appId, appOwner, null, @@ -258,4 +397,16 @@ public class LogServlet extends Configured { "The application is not at Running or Finished State."); } } + + public static WrappedLogMetaRequest.Builder createRequestFromContainerId( + String containerIdStr) { + WrappedLogMetaRequest.Builder logMetaRequestBuilder = + WrappedLogMetaRequest.builder(); + try { + logMetaRequestBuilder.setContainerId(containerIdStr); + } catch (IllegalArgumentException e) { + throw new BadRequestException("Invalid container id: " + containerIdStr); + } + return logMetaRequestBuilder; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java index 1ad6b61e9be..9a9ec77dd75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java @@ -151,7 +151,11 @@ public class LogWebService implements AppInfoProvider { @DefaultValue("false") boolean redirectedFromNode, @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) { initForReadableEndpoints(res); - return logServlet.getContainerLogsInfo(req, containerIdStr, nmId, + + WrappedLogMetaRequest.Builder logMetaRequestBuilder = + LogServlet.createRequestFromContainerId(containerIdStr); + + return logServlet.getContainerLogsInfo(req, logMetaRequestBuilder, nmId, redirectedFromNode, clusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java index defde4d39aa..10709237eda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java @@ -29,29 +29,25 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; -import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; -import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.webapp.ForbiddenException; -import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.GenericEntity; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.UndeclaredThrowableException; import java.nio.charset.Charset; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; /** @@ -60,56 +56,14 @@ import java.util.Set; @InterfaceAudience.Private @InterfaceStability.Evolving public final class LogWebServiceUtils { + private static final Logger LOG = + LoggerFactory.getLogger(LogWebServiceUtils.class); private LogWebServiceUtils() { } private static final Joiner DOT_JOINER = Joiner.on(". "); - public static Response getContainerLogMeta( - LogAggregationFileControllerFactory factory, ApplicationId appId, - String appOwner, final String nodeId, final String containerIdStr, - boolean emptyLocalContainerLogMeta) { - try { - ContainerLogsRequest request = new ContainerLogsRequest(); - request.setAppId(appId); - request.setAppOwner(appOwner); - request.setContainerId(containerIdStr); - request.setNodeId(nodeId); - List containerLogMeta = - factory.getFileControllerForRead(appId, appOwner) - .readAggregatedLogsMeta(request); - if (containerLogMeta.isEmpty()) { - throw new NotFoundException( - "Can not get log meta for container: " + containerIdStr); - } - List containersLogsInfo = new ArrayList<>(); - for (ContainerLogMeta meta : containerLogMeta) { - ContainerLogsInfo logInfo = - new ContainerLogsInfo(meta, ContainerLogAggregationType.AGGREGATED); - containersLogsInfo.add(logInfo); - } - if (emptyLocalContainerLogMeta) { - ContainerLogMeta emptyMeta = - new ContainerLogMeta(containerIdStr, "N/A"); - ContainerLogsInfo empty = - new ContainerLogsInfo(emptyMeta, ContainerLogAggregationType.LOCAL); - containersLogsInfo.add(empty); - } - GenericEntity> meta = - new GenericEntity>(containersLogsInfo) { - }; - Response.ResponseBuilder response = Response.ok(meta); - // Sending the X-Content-Type-Options response header with the value - // nosniff will prevent Internet Explorer from MIME-sniffing a response - // away from the declared content-type. - response.header("X-Content-Type-Options", "nosniff"); - return response.build(); - } catch (Exception ex) { - throw new WebApplicationException(ex); - } - } - public static Response sendStreamOutputResponse( LogAggregationFileControllerFactory factory, ApplicationId appId, String appOwner, String nodeId, String containerIdStr, String fileName, @@ -131,6 +85,7 @@ public final class LogWebServiceUtils { getStreamingOutput(factory, appId, appOwner, nodeId, containerIdStr, fileName, bytes, printEmptyLocalContainerLog); } catch (Exception ex) { + LOG.debug("Exception", ex); return createBadResponse(Response.Status.INTERNAL_SERVER_ERROR, ex.getMessage()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/WrappedLogMetaRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/WrappedLogMetaRequest.java new file mode 100644 index 00000000000..d39eef8cee4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/WrappedLogMetaRequest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.webapp; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; + +/** + * WrappedLogMetaRequest is wrapping a log request initiated by the client. + * This wrapper class translates the request to a {@link ContainerLogsRequest} + * and calls #readAggregatedLogsMeta on the + * {@link LogAggregationFileController}. + * class. + */ +public class WrappedLogMetaRequest { + + private final LogAggregationFileControllerFactory factory; + private final ApplicationId appId; + private final String appOwner; + private final ContainerId containerId; + private final String nodeId; + private final ApplicationAttemptId applicationAttemptId; + + private WrappedLogMetaRequest(Builder builder) { + this.factory = builder.factory; + this.appId = builder.appId; + this.appOwner = builder.appOwner; + this.containerId = builder.containerId; + this.nodeId = builder.nodeId; + this.applicationAttemptId = builder.applicationAttemptId; + } + + public static class Builder { + private LogAggregationFileControllerFactory factory; + private ApplicationId appId; + private String appOwner; + private ContainerId containerId; + private String nodeId; + private ApplicationAttemptId applicationAttemptId; + + Builder() { + } + + Builder setFactory(LogAggregationFileControllerFactory logFactory) { + this.factory = logFactory; + return this; + } + + public Builder setApplicationId(ApplicationId applicationId) { + this.appId = applicationId; + return this; + } + + Builder setNodeId(String nid) { + this.nodeId = nid; + return this; + } + + public Builder setContainerId(@Nullable String containerIdStr) { + if (containerIdStr != null) { + this.containerId = ContainerId.fromString(containerIdStr); + } + return this; + } + + Builder setAppOwner(String user) { + this.appOwner = user; + return this; + } + + public Builder setApplicationAttemptId(ApplicationAttemptId appAttemptId) { + this.applicationAttemptId = appAttemptId; + return this; + } + + String getAppId() { + return WrappedLogMetaRequest.getAppId(appId, applicationAttemptId, + containerId); + } + + WrappedLogMetaRequest build() { + if (this.factory == null) { + throw new AssertionError("WrappedLogMetaRequest's builder should be " + + "given a LogAggregationFileControllerFactory as parameter."); + } + return new WrappedLogMetaRequest(this); + } + } + + public static Builder builder() { + return new Builder(); + } + + private static String getAppId(ApplicationId appId, + ApplicationAttemptId applicationAttemptId, ContainerId containerId) { + if (appId == null) { + if (applicationAttemptId == null) { + return containerId.getApplicationAttemptId().getApplicationId() + .toString(); + } else { + return applicationAttemptId.getApplicationId().toString(); + } + } + return appId.toString(); + } + + public String getAppId() { + return getAppId(appId, applicationAttemptId, containerId); + } + + public String getAppAttemptId() { + if (applicationAttemptId == null) { + if (containerId != null) { + return containerId.getApplicationAttemptId().toString(); + } else { + return null; + } + } else { + return applicationAttemptId.toString(); + } + } + + public ContainerId getContainerId() { + return containerId; + } + + /** + * Constructs a {@link ContainerLogsRequest} object, and obtains + * {@link ContainerLogsRequest} from the corresponding + * {@link LogAggregationFileController}. + * + * @return list of {@link ContainerLogMeta} objects that belong + * to the application, attempt or container + */ + public List getContainerLogMetas() throws IOException { + ApplicationId applicationId = ApplicationId.fromString(getAppId()); + + ContainerLogsRequest request = new ContainerLogsRequest(); + request.setAppId(applicationId); + request.setAppAttemptId(applicationAttemptId); + if (containerId != null) { + request.setContainerId(containerId.toString()); + } + request.setAppOwner(appOwner); + request.setNodeId(nodeId); + + return factory.getFileControllerForRead(applicationId, appOwner) + .readAggregatedLogsMeta(request); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java index 5f96f231524..c737fc82a24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java @@ -29,6 +29,8 @@ import org.apache.hadoop.classification.InterfaceAudience; public interface YarnWebServiceParams { // the params used in container-log related web services + String APP_ID = "appid"; + String APPATTEMPT_ID = "appattemptid"; String CONTAINER_ID = "containerid"; String CONTAINER_LOG_FILE_NAME = "filename"; String RESPONSE_CONTENT_FORMAT = "format"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java index 1bb0408d944..16074fbeed1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerLogsInfo.java @@ -24,7 +24,6 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; @@ -61,7 +60,7 @@ public class ContainerLogsInfo { public ContainerLogsInfo() {} public ContainerLogsInfo(ContainerLogMeta logMeta, - ContainerLogAggregationType logType) throws YarnException { + ContainerLogAggregationType logType) { this.containerLogsInfo = new ArrayList( logMeta.getContainerLogMeta()); this.logType = logType.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index dbd980b8501..88b06ea3cb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -93,6 +93,7 @@ import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -668,8 +669,9 @@ public class TestNMWebServices extends JerseyTestBase { String aggregatedLogMessage = "This is aggregated ;og."; TestContainerLogsUtils.createContainerLogFileInRemoteFS( nmContext.getConf(), FileSystem.get(nmContext.getConf()), - tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(), - aggregatedLogFile, "user", aggregatedLogMessage, true); + tempLogDir.getAbsolutePath(), appId, + Collections.singletonMap(containerId, aggregatedLogMessage), + nmContext.getNodeId(), aggregatedLogFile, "user", true); r1 = resource(); response = r1.path("ws").path("v1").path("node") .path("containers").path(containerIdStr) @@ -697,8 +699,9 @@ public class TestNMWebServices extends JerseyTestBase { // Test whether we could get aggregated log as well TestContainerLogsUtils.createContainerLogFileInRemoteFS( nmContext.getConf(), FileSystem.get(nmContext.getConf()), - tempLogDir.getAbsolutePath(), containerId, nmContext.getNodeId(), - filename, "user", aggregatedLogMessage, true); + tempLogDir.getAbsolutePath(), appId, + Collections.singletonMap(containerId, aggregatedLogMessage), + nmContext.getNodeId(), filename, "user", true); response = r.path(filename) .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); responseText = response.getEntity(String.class);