YARN-7891. LogAggregationIndexedFileController should support read from HAR file. (Xuan Gong via wangda)
Change-Id: I16e081f21c5f80160564c49cc49d103bd8eb7e16
This commit is contained in:
parent
e718ac597f
commit
583f459431
|
@ -249,6 +249,10 @@
|
||||||
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
|
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
|
||||||
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
|
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
|
||||||
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
|
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
|
||||||
|
<exclude>src/test/resources/application_123456_0001.har/_index</exclude>
|
||||||
|
<exclude>src/test/resources/application_123456_0001.har/part-0</exclude>
|
||||||
|
<exclude>src/test/resources/application_123456_0001.har/_masterindex</exclude>
|
||||||
|
<exclude>src/test/resources/application_123456_0001.har/_SUCCESS</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
|
@ -495,16 +495,21 @@ public class LogAggregationIndexedFileController
|
||||||
boolean getAllContainers = (containerIdStr == null
|
boolean getAllContainers = (containerIdStr == null
|
||||||
|| containerIdStr.isEmpty());
|
|| containerIdStr.isEmpty());
|
||||||
long size = logRequest.getBytes();
|
long size = logRequest.getBytes();
|
||||||
List<FileStatus> nodeFiles = LogAggregationUtils
|
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
||||||
.getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(),
|
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
|
||||||
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
|
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
|
||||||
if (nodeFiles.isEmpty()) {
|
if (!nodeFiles.hasNext()) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log fils for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
|
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
|
||||||
|
if (allFiles.isEmpty()) {
|
||||||
|
throw new IOException("There is no available log fils for "
|
||||||
|
+ "application:" + appId);
|
||||||
|
}
|
||||||
|
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
|
||||||
List<FileStatus> fileToRead = getNodeLogFileToRead(
|
List<FileStatus> fileToRead = getNodeLogFileToRead(
|
||||||
nodeFiles, nodeIdStr, appId);
|
allFiles, nodeIdStr, appId);
|
||||||
byte[] buf = new byte[65535];
|
byte[] buf = new byte[65535];
|
||||||
for (FileStatus thisNodeFile : fileToRead) {
|
for (FileStatus thisNodeFile : fileToRead) {
|
||||||
String nodeName = thisNodeFile.getPath().getName();
|
String nodeName = thisNodeFile.getPath().getName();
|
||||||
|
@ -609,16 +614,21 @@ public class LogAggregationIndexedFileController
|
||||||
containerIdStr.isEmpty());
|
containerIdStr.isEmpty());
|
||||||
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
|
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
|
||||||
: LogAggregationUtils.getNodeString(nodeId);
|
: LogAggregationUtils.getNodeString(nodeId);
|
||||||
List<FileStatus> nodeFiles = LogAggregationUtils
|
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
||||||
.getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir,
|
.getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
|
||||||
this.remoteRootLogDirSuffix);
|
this.remoteRootLogDirSuffix);
|
||||||
if (nodeFiles.isEmpty()) {
|
if (!nodeFiles.hasNext()) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log fils for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
|
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
|
||||||
|
if (allFiles.isEmpty()) {
|
||||||
|
throw new IOException("There is no available log fils for "
|
||||||
|
+ "application:" + appId);
|
||||||
|
}
|
||||||
|
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
|
||||||
List<FileStatus> fileToRead = getNodeLogFileToRead(
|
List<FileStatus> fileToRead = getNodeLogFileToRead(
|
||||||
nodeFiles, nodeIdStr, appId);
|
allFiles, nodeIdStr, appId);
|
||||||
for(FileStatus thisNodeFile : fileToRead) {
|
for(FileStatus thisNodeFile : fileToRead) {
|
||||||
try {
|
try {
|
||||||
Long checkSumIndex = checkSumFiles.get(
|
Long checkSumIndex = checkSumFiles.get(
|
||||||
|
@ -727,25 +737,37 @@ public class LogAggregationIndexedFileController
|
||||||
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
|
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<FileStatus> listOfFiles = new ArrayList<>();
|
List<FileStatus> listOfFiles = new ArrayList<>();
|
||||||
List<FileStatus> files = new ArrayList<>(nodeFiles);
|
for (FileStatus thisNodeFile : nodeFiles) {
|
||||||
for (FileStatus file : files) {
|
String nodeName = thisNodeFile.getPath().getName();
|
||||||
String nodeName = file.getPath().getName();
|
|
||||||
if ((nodeId == null || nodeId.isEmpty()
|
if ((nodeId == null || nodeId.isEmpty()
|
||||||
|| nodeName.contains(LogAggregationUtils
|
|| nodeName.contains(LogAggregationUtils
|
||||||
.getNodeString(nodeId))) && !nodeName.endsWith(
|
.getNodeString(nodeId))) && !nodeName.endsWith(
|
||||||
LogAggregationUtils.TMP_FILE_SUFFIX) &&
|
LogAggregationUtils.TMP_FILE_SUFFIX) &&
|
||||||
!nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
|
!nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
|
||||||
if (nodeName.equals(appId + ".har")) {
|
listOfFiles.add(thisNodeFile);
|
||||||
Path p = new Path("har:///" + file.getPath().toUri().getRawPath());
|
|
||||||
files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
listOfFiles.add(file);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return listOfFiles;
|
return listOfFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<FileStatus> getAllNodeFiles(
|
||||||
|
RemoteIterator<FileStatus> nodeFiles, ApplicationId appId)
|
||||||
|
throws IOException {
|
||||||
|
List<FileStatus> listOfFiles = new ArrayList<>();
|
||||||
|
while (nodeFiles != null && nodeFiles.hasNext()) {
|
||||||
|
FileStatus thisNodeFile = nodeFiles.next();
|
||||||
|
String nodeName = thisNodeFile.getPath().getName();
|
||||||
|
if (nodeName.equals(appId + ".har")) {
|
||||||
|
Path p = new Path("har:///"
|
||||||
|
+ thisNodeFile.getPath().toUri().getRawPath());
|
||||||
|
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
listOfFiles.add(thisNodeFile);
|
||||||
|
}
|
||||||
|
return listOfFiles;
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public FileStatus getAllChecksumFiles(Map<String, FileStatus> fileMap,
|
public FileStatus getAllChecksumFiles(Map<String, FileStatus> fileMap,
|
||||||
String fileName) {
|
String fileName) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -27,6 +28,7 @@ import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
|
import java.net.URL;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -364,6 +366,58 @@ public class TestLogAggregationIndexFileController {
|
||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 15000)
|
||||||
|
public void testFetchApplictionLogsHar() throws Exception {
|
||||||
|
List<String> newLogTypes = new ArrayList<>();
|
||||||
|
newLogTypes.add("syslog");
|
||||||
|
newLogTypes.add("stdout");
|
||||||
|
newLogTypes.add("stderr");
|
||||||
|
newLogTypes.add("test1");
|
||||||
|
newLogTypes.add("test2");
|
||||||
|
URL harUrl = ClassLoader.getSystemClassLoader()
|
||||||
|
.getResource("application_123456_0001.har");
|
||||||
|
assertNotNull(harUrl);
|
||||||
|
|
||||||
|
Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName()
|
||||||
|
+ "/logs/application_123456_0001");
|
||||||
|
if (fs.exists(path)) {
|
||||||
|
fs.delete(path, true);
|
||||||
|
}
|
||||||
|
assertTrue(fs.mkdirs(path));
|
||||||
|
Path harPath = new Path(path, "application_123456_0001.har");
|
||||||
|
fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath);
|
||||||
|
assertTrue(fs.exists(harPath));
|
||||||
|
LogAggregationIndexedFileController fileFormat
|
||||||
|
= new LogAggregationIndexedFileController();
|
||||||
|
fileFormat.initialize(conf, "Indexed");
|
||||||
|
ContainerLogsRequest logRequest = new ContainerLogsRequest();
|
||||||
|
logRequest.setAppId(appId);
|
||||||
|
logRequest.setNodeId(nodeId.toString());
|
||||||
|
logRequest.setAppOwner(USER_UGI.getShortUserName());
|
||||||
|
logRequest.setContainerId(containerId.toString());
|
||||||
|
logRequest.setBytes(Long.MAX_VALUE);
|
||||||
|
List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
|
||||||
|
logRequest);
|
||||||
|
Assert.assertEquals(meta.size(), 3);
|
||||||
|
List<String> fileNames = new ArrayList<>();
|
||||||
|
for (ContainerLogMeta log : meta) {
|
||||||
|
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
||||||
|
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
||||||
|
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
||||||
|
fileNames.add(file.getFileName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fileNames.removeAll(newLogTypes);
|
||||||
|
Assert.assertTrue(fileNames.isEmpty());
|
||||||
|
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
||||||
|
Assert.assertTrue(foundLogs);
|
||||||
|
for (String logType : newLogTypes) {
|
||||||
|
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
||||||
|
containerId, logType)));
|
||||||
|
}
|
||||||
|
sysOutStream.reset();
|
||||||
|
}
|
||||||
|
|
||||||
private File createAndWriteLocalLogFile(ContainerId containerId,
|
private File createAndWriteLocalLogFile(ContainerId containerId,
|
||||||
Path localLogDir, String logType) throws IOException {
|
Path localLogDir, String logType) throws IOException {
|
||||||
File file = new File(localLogDir.toString(), logType);
|
File file = new File(localLogDir.toString(), logType);
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 localhost_9999_1517727668513
|
||||||
|
%2Flocalhost_9999_1517727665265 file part-0 0 2895 1517728301581+420+xuan+supergroup
|
||||||
|
%2Flocalhost_9999_1517727668513 file part-0 2895 1228 1517728311919+420+xuan+supergroup
|
|
@ -0,0 +1,2 @@
|
||||||
|
3
|
||||||
|
0 1897968749 0 280
|
Binary file not shown.
Loading…
Reference in New Issue