From 583f4594314b3db25b57b1e46ea8026eab21f932 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 7 Mar 2018 15:46:22 -0800 Subject: [PATCH] YARN-7891. LogAggregationIndexedFileController should support read from HAR file. (Xuan Gong via wangda) Change-Id: I16e081f21c5f80160564c49cc49d103bd8eb7e16 --- .../hadoop-yarn/hadoop-yarn-common/pom.xml | 4 ++ .../LogAggregationIndexedFileController.java | 60 ++++++++++++------ ...TestLogAggregationIndexFileController.java | 54 ++++++++++++++++ .../application_123456_0001.har/_SUCCESS | 0 .../application_123456_0001.har/_index | 3 + .../application_123456_0001.har/_masterindex | 2 + .../application_123456_0001.har/part-0 | Bin 0 -> 4123 bytes 7 files changed, 104 insertions(+), 19 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index a2354784d0f..537807227d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -249,6 +249,10 @@ src/test/resources/application_1440536969523_0001.har/part-0 src/test/resources/application_1440536969523_0001.har/_masterindex src/test/resources/application_1440536969523_0001.har/_SUCCESS + src/test/resources/application_123456_0001.har/_index + src/test/resources/application_123456_0001.har/part-0 + src/test/resources/application_123456_0001.har/_masterindex + src/test/resources/application_123456_0001.har/_SUCCESS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 56bae26d939..5bba2e0a409 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -495,16 +495,21 @@ public class LogAggregationIndexedFileController boolean getAllContainers = (containerIdStr == null || containerIdStr.isEmpty()); long size = logRequest.getBytes(); - List nodeFiles = LogAggregationUtils - .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(), + RemoteIterator nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(), this.remoteRootLogDir, this.remoteRootLogDirSuffix); - if (nodeFiles.isEmpty()) { + if (!nodeFiles.hasNext()) { throw new IOException("There is no available log fils for " + "application:" + appId); } - Map checkSumFiles = parseCheckSumFiles(nodeFiles); + List allFiles = getAllNodeFiles(nodeFiles, appId); + if (allFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map checkSumFiles = parseCheckSumFiles(allFiles); List fileToRead = getNodeLogFileToRead( - nodeFiles, nodeIdStr, appId); + allFiles, nodeIdStr, appId); byte[] buf = new byte[65535]; for (FileStatus thisNodeFile : fileToRead) { String nodeName = thisNodeFile.getPath().getName(); @@ -609,16 +614,21 @@ public class LogAggregationIndexedFileController containerIdStr.isEmpty()); String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null : LogAggregationUtils.getNodeString(nodeId); - List nodeFiles = LogAggregationUtils - .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir, + RemoteIterator nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir, this.remoteRootLogDirSuffix); - if (nodeFiles.isEmpty()) { + if (!nodeFiles.hasNext()) { throw new IOException("There is no available log fils for " + "application:" + appId); } - Map checkSumFiles = parseCheckSumFiles(nodeFiles); + List allFiles = getAllNodeFiles(nodeFiles, appId); + if (allFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map checkSumFiles = parseCheckSumFiles(allFiles); List fileToRead = getNodeLogFileToRead( - nodeFiles, nodeIdStr, appId); + allFiles, nodeIdStr, appId); for(FileStatus thisNodeFile : fileToRead) { try { Long checkSumIndex = checkSumFiles.get( @@ -727,25 +737,37 @@ public class LogAggregationIndexedFileController List nodeFiles, String nodeId, ApplicationId appId) throws IOException { List listOfFiles = new ArrayList<>(); - List files = new ArrayList<>(nodeFiles); - for (FileStatus file : files) { - String nodeName = file.getPath().getName(); + for (FileStatus thisNodeFile : nodeFiles) { + String nodeName = thisNodeFile.getPath().getName(); if ((nodeId == null || nodeId.isEmpty() || nodeName.contains(LogAggregationUtils .getNodeString(nodeId))) && !nodeName.endsWith( LogAggregationUtils.TMP_FILE_SUFFIX) && !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) { - if (nodeName.equals(appId + ".har")) { - Path p = new Path("har:///" + file.getPath().toUri().getRawPath()); - files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p)); - continue; - } - listOfFiles.add(file); + listOfFiles.add(thisNodeFile); } } return listOfFiles; } + private List getAllNodeFiles( + RemoteIterator nodeFiles, ApplicationId appId) + throws IOException { + List listOfFiles = new ArrayList<>(); + while (nodeFiles != null && nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + String nodeName = thisNodeFile.getPath().getName(); + if (nodeName.equals(appId + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + listOfFiles.add(thisNodeFile); + } + return listOfFiles; + } + @Private public FileStatus getAllChecksumFiles(Map fileMap, String fileName) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java index 9c02c1b1488..79226797578 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; @@ -27,6 +28,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintStream; import java.io.Writer; +import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; @@ -364,6 +366,58 @@ public class TestLogAggregationIndexFileController { sysOutStream.reset(); } + @Test(timeout = 15000) + public void testFetchApplictionLogsHar() throws Exception { + List newLogTypes = new ArrayList<>(); + newLogTypes.add("syslog"); + newLogTypes.add("stdout"); + newLogTypes.add("stderr"); + newLogTypes.add("test1"); + newLogTypes.add("test2"); + URL harUrl = ClassLoader.getSystemClassLoader() + .getResource("application_123456_0001.har"); + assertNotNull(harUrl); + + Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName() + + "/logs/application_123456_0001"); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + Path harPath = new Path(path, "application_123456_0001.har"); + fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath); + assertTrue(fs.exists(harPath)); + LogAggregationIndexedFileController fileFormat + = new LogAggregationIndexedFileController(); + fileFormat.initialize(conf, "Indexed"); + ContainerLogsRequest logRequest = new ContainerLogsRequest(); + logRequest.setAppId(appId); + logRequest.setNodeId(nodeId.toString()); + logRequest.setAppOwner(USER_UGI.getShortUserName()); + logRequest.setContainerId(containerId.toString()); + logRequest.setBytes(Long.MAX_VALUE); + List meta = fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertEquals(meta.size(), 3); + List fileNames = new ArrayList<>(); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + for (ContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(newLogTypes); + Assert.assertTrue(fileNames.isEmpty()); + boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : newLogTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + } + private File createAndWriteLocalLogFile(ContainerId containerId, Path localLogDir, String logType) throws IOException { File file = new File(localLogDir.toString(), logType); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index new file mode 100644 index 00000000000..b042846e1b0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index @@ -0,0 +1,3 @@ +%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 localhost_9999_1517727668513 +%2Flocalhost_9999_1517727665265 file part-0 0 2895 1517728301581+420+xuan+supergroup +%2Flocalhost_9999_1517727668513 file part-0 2895 1228 1517728311919+420+xuan+supergroup diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex new file mode 100644 index 00000000000..cda8cbdcab0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex @@ -0,0 +1,2 @@ +3 +0 1897968749 0 280 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 new file mode 100644 index 0000000000000000000000000000000000000000..aea95fa8da369ef803242540b6edebc991d40de7 GIT binary patch literal 4123 zcmeHJ%}*0S6o35!M81S*;=vfA2SK}S`HUKa1VdOTA6f|Epmow6wrjRKWp)Y$BgT^n z`Uhwto-`gbUOl5RNP?gyUW_+Sns|}u**8lmeb97FKik2_TIkQ#cy4ko%!~`*@L_xa{>Qlggh^MmM7%pnT5IDL4)yY>B4uv8wyk?9Hrbw>tXPcm}v_r58oi8B#;t@8R#a- z^I<57u|$$n+wLI=Dxh+Cfk`Yq5E0z4bX-$}dzH4)Kt+y$uwe_FEwGqIRc!<&!Htem zHs#KHIw|*p3>=a^73ovyAa)%6`U=0yyg73HqX*pWP;4@ywUd}&T!jN7j+==yQ%fgX zE}{W|im@X3l&@(4ZhZnxR;`+B<(!Pmt)%aKI`(juOnbnkg4Z@C=<*K6yoi+hxVbZt z=%YL*Xc(7FSl~y)z@i&Golu@i%75-R5;P)6BALUmWX=_a!<;6VpgqVe8jul};mOkK zBGJd;5nF!yGAxy3S#k+DE;ET37Uj`UU427iQxt0<6qUEULJm^0>T$^NAUPvDLyr8F z97<$lFjG6BQs#SGCKag`!RU2DksMrptuitDx|xP`5hH>++;u{+EeID*1%%;X_NTMS z=onbT96AK3cb))2^o~q+#WUa4lF(8C1-8&A=MGEdnYr~a{W%zx5_w_nW-;S7mR1_m zy3AxTV#b**qG$11GjdLj2q+TN76Vva0N$aRZIVURuY^z)heM@*@{~A<=to4L^mo46 zyCo<4GKv|v4KfOjK)7T&Z$?Jlj#9Yyx{R)}v@fH(Uq&4@E?!poZRwXQRUid?r8CrJ zIz(V?H}Or&MMnkrq;A)(X$