YARN-10031. Create a general purpose log request with additional query parameters. Contributed by Andras Gyori
This commit is contained in:
parent
9ed737001c
commit
3234e5eaf3
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.mapreduce.v2.hs.webapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -69,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
|
||||
import org.apache.hadoop.yarn.server.webapp.WrappedLogMetaRequest;
|
||||
import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
|
||||
import org.apache.hadoop.yarn.server.webapp.LogServlet;
|
||||
|
@ -441,6 +443,31 @@ public class HsWebServices extends WebServices {
|
|||
return logServlet.getRemoteLogDirPath(user, appIdStr);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/extended-log-query")
|
||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public Response getAggregatedLogsMeta(@Context HttpServletRequest hsr,
|
||||
@QueryParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String fileName,
|
||||
@QueryParam(YarnWebServiceParams.FILESIZE) Set<String> fileSize,
|
||||
@QueryParam(YarnWebServiceParams.MODIFICATION_TIME) Set<String>
|
||||
modificationTime,
|
||||
@QueryParam(YarnWebServiceParams.APP_ID) String appIdStr,
|
||||
@QueryParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
|
||||
@QueryParam(YarnWebServiceParams.NM_ID) String nmId) throws IOException {
|
||||
init();
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder logsRequest =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
logsRequest.setAppId(appIdStr);
|
||||
logsRequest.setFileName(fileName);
|
||||
logsRequest.setContainerId(containerIdStr);
|
||||
logsRequest.setFileSize(fileSize);
|
||||
logsRequest.setModificationTime(modificationTime);
|
||||
logsRequest.setNodeId(nmId);
|
||||
return logServlet.getContainerLogsInfo(hsr, logsRequest);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/aggregatedlogs")
|
||||
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
|
||||
|
|
|
@ -0,0 +1,291 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.regex.PatternSyntaxException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Represents a query of log metadata with extended filtering capabilities.
|
||||
*/
|
||||
public class ExtendedLogMetaRequest {
|
||||
private final String user;
|
||||
private final String appId;
|
||||
private final String containerId;
|
||||
private final MatchExpression nodeId;
|
||||
private final MatchExpression fileName;
|
||||
private final ComparisonCollection fileSize;
|
||||
private final ComparisonCollection modificationTime;
|
||||
|
||||
public ExtendedLogMetaRequest(
|
||||
String user, String appId, String containerId, MatchExpression nodeId,
|
||||
MatchExpression fileName, ComparisonCollection fileSize,
|
||||
ComparisonCollection modificationTime) {
|
||||
this.user = user;
|
||||
this.appId = appId;
|
||||
this.containerId = containerId;
|
||||
this.nodeId = nodeId;
|
||||
this.fileName = fileName;
|
||||
this.fileSize = fileSize;
|
||||
this.modificationTime = modificationTime;
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public String getAppId() {
|
||||
return appId;
|
||||
}
|
||||
|
||||
public String getContainerId() {
|
||||
return containerId;
|
||||
}
|
||||
|
||||
public MatchExpression getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public MatchExpression getFileName() {
|
||||
return fileName;
|
||||
}
|
||||
|
||||
public ComparisonCollection getFileSize() {
|
||||
return fileSize;
|
||||
}
|
||||
|
||||
public ComparisonCollection getModificationTime() {
|
||||
return modificationTime;
|
||||
}
|
||||
|
||||
public static class ExtendedLogMetaRequestBuilder {
|
||||
private String user;
|
||||
private String appId;
|
||||
private String containerId;
|
||||
private MatchExpression nodeId = new MatchExpression(null);
|
||||
private MatchExpression fileName = new MatchExpression(null);
|
||||
private ComparisonCollection fileSize = new ComparisonCollection(null);
|
||||
private ComparisonCollection modificationTime =
|
||||
new ComparisonCollection(null);
|
||||
|
||||
public ExtendedLogMetaRequestBuilder setUser(String userName) {
|
||||
this.user = userName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExtendedLogMetaRequestBuilder setAppId(String applicationId) {
|
||||
this.appId = applicationId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExtendedLogMetaRequestBuilder setContainerId(String container) {
|
||||
this.containerId = container;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExtendedLogMetaRequestBuilder setNodeId(String node) {
|
||||
try {
|
||||
this.nodeId = new MatchExpression(node);
|
||||
} catch (PatternSyntaxException e) {
|
||||
throw new IllegalArgumentException("Node Id expression is invalid", e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExtendedLogMetaRequestBuilder setFileName(String file) {
|
||||
try {
|
||||
this.fileName = new MatchExpression(file);
|
||||
} catch (PatternSyntaxException e) {
|
||||
throw new IllegalArgumentException("Filename expression is invalid", e);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExtendedLogMetaRequestBuilder setFileSize(Set<String> fileSizes) {
|
||||
this.fileSize = new ComparisonCollection(fileSizes);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExtendedLogMetaRequestBuilder setModificationTime(
|
||||
Set<String> modificationTimes) {
|
||||
this.modificationTime = new ComparisonCollection(modificationTimes);
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isUserSet() {
|
||||
return user != null;
|
||||
}
|
||||
|
||||
public ExtendedLogMetaRequest build() {
|
||||
return new ExtendedLogMetaRequest(user, appId, containerId, nodeId,
|
||||
fileName, fileSize, modificationTime);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A collection of {@code ComparisonExpression}.
|
||||
*/
|
||||
public static class ComparisonCollection {
|
||||
private List<ComparisonExpression> comparisonExpressions;
|
||||
|
||||
public ComparisonCollection(Set<String> expressions) {
|
||||
if (expressions == null) {
|
||||
this.comparisonExpressions = Collections.emptyList();
|
||||
} else {
|
||||
List<String> equalExpressions = expressions.stream().filter(
|
||||
e -> !e.startsWith(ComparisonExpression.GREATER_OPERATOR) &&
|
||||
!e.startsWith(ComparisonExpression.LESSER_OPERATOR))
|
||||
.collect(Collectors.toList());
|
||||
if (equalExpressions.size() > 1) {
|
||||
throw new IllegalArgumentException(
|
||||
"Can not process more, than one exact match. Matches: "
|
||||
+ String.join(" ", equalExpressions));
|
||||
}
|
||||
|
||||
this.comparisonExpressions = expressions.stream()
|
||||
.map(ComparisonExpression::new).collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean match(Long value) {
|
||||
return match(value, true);
|
||||
}
|
||||
|
||||
public boolean match(String value) {
|
||||
if (value == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return match(Long.valueOf(value), true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks, if the given value matches all the {@code ComparisonExpression}.
|
||||
* This implies an AND logic between the expressions.
|
||||
* @param value given value to match against
|
||||
* @param defaultValue default value to return when no expression is defined
|
||||
* @return whether all expressions were matched
|
||||
*/
|
||||
public boolean match(Long value, boolean defaultValue) {
|
||||
if (comparisonExpressions.isEmpty()) {
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
return comparisonExpressions.stream()
|
||||
.allMatch(expr -> expr.match(value));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a comparison logic based on a stringified expression.
|
||||
* The format of the expression is:
|
||||
* >value = is greater than value
|
||||
* <value = is lower than value
|
||||
* value = is equal to value
|
||||
*/
|
||||
public static class ComparisonExpression {
|
||||
public static final String GREATER_OPERATOR = ">";
|
||||
public static final String LESSER_OPERATOR = "<";
|
||||
|
||||
private String expression;
|
||||
private Predicate<Long> comparisonFn;
|
||||
private Long convertedValue;
|
||||
|
||||
public ComparisonExpression(String expression) {
|
||||
if (expression == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (expression.startsWith(GREATER_OPERATOR)) {
|
||||
convertedValue = Long.parseLong(expression.substring(1));
|
||||
comparisonFn = a -> a > convertedValue;
|
||||
} else if (expression.startsWith(LESSER_OPERATOR)) {
|
||||
convertedValue = Long.parseLong(expression.substring(1));
|
||||
comparisonFn = a -> a < convertedValue;
|
||||
} else {
|
||||
convertedValue = Long.parseLong(expression);
|
||||
comparisonFn = a -> a.equals(convertedValue);
|
||||
}
|
||||
|
||||
this.expression = expression;
|
||||
}
|
||||
|
||||
public boolean match(String value) {
|
||||
return match(Long.valueOf(value), true);
|
||||
}
|
||||
|
||||
public boolean match(Long value) {
|
||||
return match(value, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the given value with the defined comparison functions based on
|
||||
* stringified expression.
|
||||
* @param value value to test with
|
||||
* @param defaultValue value to return when no expression was defined
|
||||
* @return comparison test result or the given default value
|
||||
*/
|
||||
public boolean match(Long value, boolean defaultValue) {
|
||||
if (expression == null) {
|
||||
return defaultValue;
|
||||
} else {
|
||||
return comparisonFn.test(value);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return convertedValue != null ? String.valueOf(convertedValue) : "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a regex matcher.
|
||||
*/
|
||||
public static class MatchExpression {
|
||||
private Pattern expression;
|
||||
|
||||
public MatchExpression(String expression) {
|
||||
this.expression = expression != null ? Pattern.compile(expression) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Matches the value on the expression.
|
||||
* @param value value to be matched against
|
||||
* @return result of the match or true, if no expression was defined
|
||||
*/
|
||||
public boolean match(String value) {
|
||||
return expression == null || expression.matcher(value).matches();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return expression != null ? expression.pattern() : "";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.HarFs;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Facilitates an extended query of aggregated log file metadata with
|
||||
* the help of file controllers.
|
||||
*/
|
||||
public class LogAggregationMetaCollector {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
LogAggregationMetaCollector.class);
|
||||
|
||||
private final ExtendedLogMetaRequest logsRequest;
|
||||
private final Configuration conf;
|
||||
|
||||
public LogAggregationMetaCollector(
|
||||
ExtendedLogMetaRequest logsRequest, Configuration conf) {
|
||||
this.logsRequest = logsRequest;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Collects all log file metadata based on the complex query defined in
|
||||
* {@code UserLogsRequest}.
|
||||
* @param fileController log aggregation file format controller
|
||||
* @return collection of log file metadata grouped by containers
|
||||
* @throws IOException if node file is not reachable
|
||||
*/
|
||||
public List<ContainerLogMeta> collect(
|
||||
LogAggregationFileController fileController) throws IOException {
|
||||
List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
|
||||
RemoteIterator<FileStatus> appDirs = fileController.
|
||||
getApplicationDirectoriesOfUser(logsRequest.getUser());
|
||||
|
||||
while (appDirs.hasNext()) {
|
||||
FileStatus currentAppDir = appDirs.next();
|
||||
if (logsRequest.getAppId() == null ||
|
||||
logsRequest.getAppId().equals(currentAppDir.getPath().getName())) {
|
||||
ApplicationId appId = ApplicationId.fromString(
|
||||
currentAppDir.getPath().getName());
|
||||
RemoteIterator<FileStatus> nodeFiles = fileController
|
||||
.getNodeFilesOfApplicationDirectory(currentAppDir);
|
||||
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus currentNodeFile = nodeFiles.next();
|
||||
if (!logsRequest.getNodeId().match(currentNodeFile.getPath()
|
||||
.getName())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (currentNodeFile.getPath().getName().equals(
|
||||
logsRequest.getAppId() + ".har")) {
|
||||
Path p = new Path("har:///"
|
||||
+ currentNodeFile.getPath().toUri().getRawPath());
|
||||
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
Map<String, List<ContainerLogFileInfo>> metaFiles = fileController
|
||||
.getLogMetaFilesOfNode(logsRequest, currentNodeFile, appId);
|
||||
if (metaFiles == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
metaFiles.entrySet().removeIf(entry ->
|
||||
!(logsRequest.getContainerId() == null ||
|
||||
logsRequest.getContainerId().equals(entry.getKey())));
|
||||
|
||||
containersLogMeta.addAll(createContainerLogMetas(
|
||||
currentNodeFile.getPath().getName(), metaFiles));
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Can not get log meta from the log file:"
|
||||
+ currentNodeFile.getPath() + "\n" + ioe.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return containersLogMeta;
|
||||
}
|
||||
|
||||
private List<ContainerLogMeta> createContainerLogMetas(
|
||||
String nodeId, Map<String, List<ContainerLogFileInfo>> metaFiles) {
|
||||
List<ContainerLogMeta> containerLogMetas = new ArrayList<>();
|
||||
for (Map.Entry<String, List<ContainerLogFileInfo>> containerLogs
|
||||
: metaFiles.entrySet()) {
|
||||
ContainerLogMeta containerLogMeta = new ContainerLogMeta(
|
||||
containerLogs.getKey(), nodeId);
|
||||
for (ContainerLogFileInfo file : containerLogs.getValue()) {
|
||||
boolean isFileNameMatches = logsRequest.getFileName()
|
||||
.match(file.getFileName());
|
||||
boolean fileSizeComparison = logsRequest.getFileSize()
|
||||
.match(file.getFileSize());
|
||||
boolean modificationTimeComparison = logsRequest.getModificationTime()
|
||||
.match(file.getLastModifiedTime());
|
||||
|
||||
if (!isFileNameMatches || !fileSizeComparison ||
|
||||
!modificationTimeComparison) {
|
||||
continue;
|
||||
}
|
||||
containerLogMeta.getContainerLogMeta().add(file);
|
||||
}
|
||||
if (!containerLogMeta.getContainerLogMeta().isEmpty()) {
|
||||
containerLogMetas.add(containerLogMeta);
|
||||
}
|
||||
}
|
||||
return containerLogMetas;
|
||||
}
|
||||
}
|
|
@ -29,10 +29,13 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
@Private
|
||||
public class LogAggregationUtils {
|
||||
|
@ -295,19 +298,8 @@ public class LogAggregationUtils {
|
|||
// Return both new and old node files combined
|
||||
RemoteIterator<FileStatus> curDir = nodeFilesCur;
|
||||
RemoteIterator<FileStatus> prevDir = nodeFilesPrev;
|
||||
RemoteIterator<FileStatus> nodeFilesCombined = new
|
||||
RemoteIterator<FileStatus>() {
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
return prevDir.hasNext() || curDir.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus next() throws IOException {
|
||||
return prevDir.hasNext() ? prevDir.next() : curDir.next();
|
||||
}
|
||||
};
|
||||
return nodeFilesCombined;
|
||||
return combineIterators(prevDir, curDir);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -368,4 +360,93 @@ public class LogAggregationUtils {
|
|||
return nodeFiles;
|
||||
}
|
||||
|
||||
public static RemoteIterator<FileStatus> getRemoteFiles(
|
||||
Configuration conf, Path appPath) throws IOException {
|
||||
|
||||
Path qualifiedLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(appPath);
|
||||
return FileContext.getFileContext(
|
||||
qualifiedLogDir.toUri(), conf).listStatus(appPath);
|
||||
}
|
||||
|
||||
public static RemoteIterator<FileStatus> getUserRemoteLogDir(
|
||||
Configuration conf, String user, Path remoteRootLogDir,
|
||||
String remoteRootLogDirSuffix) throws IOException {
|
||||
Path userPath = LogAggregationUtils.getRemoteLogSuffixedDir(
|
||||
remoteRootLogDir, user, remoteRootLogDirSuffix);
|
||||
final RemoteIterator<FileStatus> userRootDirFiles =
|
||||
getRemoteFiles(conf, userPath);
|
||||
|
||||
RemoteIterator<FileStatus> newDirs = new RemoteIterator<FileStatus>() {
|
||||
private RemoteIterator<FileStatus> currentBucketDir =
|
||||
LogAggregationUtils.getSubDir(conf, userRootDirFiles);
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
return currentBucketDir != null && currentBucketDir.hasNext() ||
|
||||
userRootDirFiles.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus next() throws IOException {
|
||||
FileStatus next = null;
|
||||
while (next == null) {
|
||||
if (currentBucketDir != null && currentBucketDir.hasNext()) {
|
||||
next = currentBucketDir.next();
|
||||
} else if (userRootDirFiles.hasNext()) {
|
||||
currentBucketDir = LogAggregationUtils.getSubDir(
|
||||
conf, userRootDirFiles);
|
||||
} else {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
}
|
||||
return next;
|
||||
}
|
||||
};
|
||||
|
||||
RemoteIterator<FileStatus> allDir = newDirs;
|
||||
if (LogAggregationUtils.isOlderPathEnabled(conf)) {
|
||||
try {
|
||||
Path oldPath = LogAggregationUtils.getOlderRemoteLogSuffixedDir(
|
||||
remoteRootLogDir, user, remoteRootLogDirSuffix);
|
||||
final RemoteIterator<FileStatus> oldUserRootDirFiles =
|
||||
getRemoteFiles(conf, oldPath);
|
||||
allDir = combineIterators(oldUserRootDirFiles, newDirs);
|
||||
} catch (FileNotFoundException e) {
|
||||
return newDirs;
|
||||
}
|
||||
}
|
||||
|
||||
return allDir;
|
||||
}
|
||||
|
||||
private static RemoteIterator<FileStatus> getSubDir(
|
||||
Configuration conf, RemoteIterator<FileStatus> rootDir)
|
||||
throws IOException {
|
||||
if (rootDir.hasNext()) {
|
||||
Path userPath = rootDir.next().getPath();
|
||||
Path qualifiedLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(userPath);
|
||||
return FileContext.getFileContext(
|
||||
qualifiedLogDir.toUri(), conf).listStatus(userPath);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static RemoteIterator<FileStatus> combineIterators(
|
||||
RemoteIterator<FileStatus> first, RemoteIterator<FileStatus> second) {
|
||||
return new RemoteIterator<FileStatus>() {
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
return first.hasNext() || second.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus next() throws IOException {
|
||||
return first.hasNext() ? first.next() : second.next();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -53,7 +54,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
|
||||
import org.apache.hadoop.yarn.webapp.View.ViewContext;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -224,6 +227,49 @@ public abstract class LogAggregationFileController {
|
|||
public abstract List<ContainerLogMeta> readAggregatedLogsMeta(
|
||||
ContainerLogsRequest logRequest) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns log file metadata for a node grouped by containers.
|
||||
*
|
||||
* @param logRequest extended query information holder
|
||||
* @param currentNodeFile file status of a node in an application directory
|
||||
* @param appId id of the application, which is the same as in node path
|
||||
* @return log file metadata
|
||||
* @throws IOException if there is no node file
|
||||
*/
|
||||
public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
|
||||
ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
|
||||
ApplicationId appId) throws IOException {
|
||||
LOG.info("User aggregated complex log queries " +
|
||||
"are not implemented for this file controller");
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all application directories of a user.
|
||||
*
|
||||
* @param user name of the user
|
||||
* @return a lazy iterator of directories
|
||||
* @throws IOException if user directory does not exist
|
||||
*/
|
||||
public RemoteIterator<FileStatus> getApplicationDirectoriesOfUser(
|
||||
String user) throws IOException {
|
||||
return LogAggregationUtils.getUserRemoteLogDir(
|
||||
conf, user, getRemoteRootLogDir(), getRemoteRootLogDirSuffix());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all node files in an application directory.
|
||||
*
|
||||
* @param appDir application directory
|
||||
* @return a lazy iterator of files
|
||||
* @throws IOException if file context is not reachable
|
||||
*/
|
||||
public RemoteIterator<FileStatus> getNodeFilesOfApplicationDirectory(
|
||||
FileStatus appDir) throws IOException {
|
||||
return LogAggregationUtils
|
||||
.getRemoteFiles(conf, appDir.getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Render Aggregated Logs block.
|
||||
* @param html the html
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.InputStream;
|
|||
import java.io.OutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -74,12 +75,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
|
@ -610,6 +613,45 @@ public class LogAggregationIndexedFileController
|
|||
return findLogs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
|
||||
ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
|
||||
ApplicationId appId) throws IOException {
|
||||
Map<String, List<ContainerLogFileInfo>> logMetaFiles = new HashMap<>();
|
||||
|
||||
Long checkSumIndex = parseChecksum(currentNodeFile);
|
||||
long endIndex = -1;
|
||||
if (checkSumIndex != null) {
|
||||
endIndex = checkSumIndex;
|
||||
}
|
||||
IndexedLogsMeta current = loadIndexedLogsMeta(
|
||||
currentNodeFile.getPath(), endIndex, appId);
|
||||
if (current != null) {
|
||||
for (IndexedPerAggregationLogMeta logMeta :
|
||||
current.getLogMetas()) {
|
||||
for (Entry<String, List<IndexedFileLogMeta>> log : logMeta
|
||||
.getLogMetas().entrySet()) {
|
||||
String currentContainerId = log.getKey();
|
||||
if (!(logRequest.getContainerId() == null ||
|
||||
logRequest.getContainerId().equals(currentContainerId))) {
|
||||
continue;
|
||||
}
|
||||
logMetaFiles.put(currentContainerId, new ArrayList<>());
|
||||
for (IndexedFileLogMeta aMeta : log.getValue()) {
|
||||
ContainerLogFileInfo file = new ContainerLogFileInfo();
|
||||
file.setFileName(aMeta.getFileName());
|
||||
file.setFileSize(Long.toString(aMeta.getFileSize()));
|
||||
file.setLastModifiedTime(
|
||||
Long.toString(aMeta.getLastModifiedTime()));
|
||||
logMetaFiles.get(currentContainerId).add(file);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return logMetaFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerLogMeta> readAggregatedLogsMeta(
|
||||
ContainerLogsRequest logRequest) throws IOException {
|
||||
|
@ -743,6 +785,40 @@ public class LogAggregationIndexedFileController
|
|||
return checkSumFiles;
|
||||
}
|
||||
|
||||
private Long parseChecksum(FileStatus file) {
|
||||
if (!file.getPath().getName().endsWith(CHECK_SUM_FILE_SUFFIX)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
FSDataInputStream checksumFileInputStream = null;
|
||||
try {
|
||||
FileContext fileContext = FileContext
|
||||
.getFileContext(file.getPath().toUri(), conf);
|
||||
String nodeName = null;
|
||||
long index = 0L;
|
||||
checksumFileInputStream = fileContext.open(file.getPath());
|
||||
int nameLength = checksumFileInputStream.readInt();
|
||||
byte[] b = new byte[nameLength];
|
||||
int actualLength = checksumFileInputStream.read(b);
|
||||
if (actualLength == nameLength) {
|
||||
nodeName = new String(b, StandardCharsets.UTF_8);
|
||||
index = checksumFileInputStream.readLong();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
if (!nodeName.isEmpty()) {
|
||||
return index;
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.warn(ex.getMessage());
|
||||
return null;
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Private
|
||||
public List<FileStatus> getNodeLogFileToRead(
|
||||
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
|
||||
|
|
|
@ -25,10 +25,13 @@ import java.io.OutputStream;
|
|||
import java.nio.charset.Charset;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
||||
import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.commons.math3.util.Pair;
|
||||
|
@ -258,6 +261,58 @@ public class LogAggregationTFileController
|
|||
return findLogs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
|
||||
ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
|
||||
ApplicationId appId) throws IOException {
|
||||
Map<String, List<ContainerLogFileInfo>> logMetaFiles = new HashMap<>();
|
||||
Path nodePath = currentNodeFile.getPath();
|
||||
|
||||
LogReader reader =
|
||||
new LogReader(conf,
|
||||
nodePath);
|
||||
try {
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
while (valueStream != null) {
|
||||
if (logRequest.getContainerId() == null ||
|
||||
logRequest.getContainerId().equals(key.toString())) {
|
||||
logMetaFiles.put(key.toString(), new ArrayList<>());
|
||||
fillMetaFiles(currentNodeFile, valueStream,
|
||||
logMetaFiles.get(key.toString()));
|
||||
}
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
return logMetaFiles;
|
||||
}
|
||||
|
||||
private void fillMetaFiles(
|
||||
FileStatus currentNodeFile, DataInputStream valueStream,
|
||||
List<ContainerLogFileInfo> logMetaFiles)
|
||||
throws IOException {
|
||||
while (true) {
|
||||
try {
|
||||
Pair<String, String> logMeta =
|
||||
LogReader.readContainerMetaDataAndSkipData(
|
||||
valueStream);
|
||||
ContainerLogFileInfo logMetaFile = new ContainerLogFileInfo();
|
||||
logMetaFile.setLastModifiedTime(
|
||||
Long.toString(currentNodeFile.getModificationTime()));
|
||||
logMetaFile.setFileName(logMeta.getFirst());
|
||||
logMetaFile.setFileSize(logMeta.getSecond());
|
||||
logMetaFiles.add(logMetaFile);
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerLogMeta> readAggregatedLogsMeta(
|
||||
ContainerLogsRequest logRequest) throws IOException {
|
||||
|
|
|
@ -0,0 +1,391 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation;
|
||||
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.FakeLogAggregationFileController;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Clock;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestLogAggregationMetaCollector {
|
||||
private static final String TEST_NODE = "TEST_NODE_1";
|
||||
private static final String TEST_NODE_2 = "TEST_NODE_2";
|
||||
private static final String BIG_FILE_NAME = "TEST_BIG";
|
||||
private static final String SMALL_FILE_NAME = "TEST_SMALL";
|
||||
|
||||
private static ApplicationId app = ApplicationId.newInstance(
|
||||
Clock.systemDefaultZone().millis(), 1);
|
||||
private static ApplicationId app2 = ApplicationId.newInstance(
|
||||
Clock.systemDefaultZone().millis(), 2);
|
||||
|
||||
private static ApplicationAttemptId appAttempt =
|
||||
ApplicationAttemptId.newInstance(app, 1);
|
||||
private static ApplicationAttemptId app2Attempt =
|
||||
ApplicationAttemptId.newInstance(app2, 1);
|
||||
|
||||
private static ContainerId attemptContainer =
|
||||
ContainerId.newContainerId(appAttempt, 1);
|
||||
private static ContainerId attemptContainer2 =
|
||||
ContainerId.newContainerId(appAttempt, 2);
|
||||
|
||||
private static ContainerId attempt2Container =
|
||||
ContainerId.newContainerId(app2Attempt, 1);
|
||||
private static ContainerId attempt2Container2 =
|
||||
ContainerId.newContainerId(app2Attempt, 2);
|
||||
|
||||
private FakeNodeFileController fileController;
|
||||
|
||||
private static class FakeNodeFileController
|
||||
extends FakeLogAggregationFileController {
|
||||
private Map<ImmutablePair<String, String>,
|
||||
Map<String, List<ContainerLogFileInfo>>> logFiles;
|
||||
private List<FileStatus> appDirs;
|
||||
private List<FileStatus> nodeFiles;
|
||||
|
||||
FakeNodeFileController(
|
||||
Map<ImmutablePair<String, String>, Map<String,
|
||||
List<ContainerLogFileInfo>>> logFiles, List<FileStatus> appDirs,
|
||||
List<FileStatus> nodeFiles) {
|
||||
this.logFiles = logFiles;
|
||||
this.appDirs = appDirs;
|
||||
this.nodeFiles = nodeFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteIterator<FileStatus> getApplicationDirectoriesOfUser(
|
||||
String user) throws IOException {
|
||||
return new RemoteIterator<FileStatus>() {
|
||||
private Iterator<FileStatus> iter = appDirs.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus next() throws IOException {
|
||||
return iter.next();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteIterator<FileStatus> getNodeFilesOfApplicationDirectory(
|
||||
FileStatus appDir) throws IOException {
|
||||
return new RemoteIterator<FileStatus>() {
|
||||
private Iterator<FileStatus> iter = nodeFiles.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus next() throws IOException {
|
||||
return iter.next();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<ContainerLogFileInfo>> getLogMetaFilesOfNode(
|
||||
ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile,
|
||||
ApplicationId appId) throws IOException {
|
||||
return logFiles.get(new ImmutablePair<>(appId.toString(),
|
||||
currentNodeFile.getPath().getName()));
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
fileController = createFileController();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllNull() throws IOException {
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
request.setAppId(null);
|
||||
request.setContainerId(null);
|
||||
request.setFileName(null);
|
||||
request.setFileSize(null);
|
||||
request.setModificationTime(null);
|
||||
request.setNodeId(null);
|
||||
request.setUser(null);
|
||||
|
||||
LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
|
||||
request.build(), new YarnConfiguration());
|
||||
List<ContainerLogMeta> res = collector.collect(fileController);
|
||||
|
||||
List<ContainerLogFileInfo> allFile = res.stream()
|
||||
.flatMap(m -> m.getContainerLogMeta().stream())
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(8, allFile.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllSet() throws IOException {
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
Set<String> fileSizeExpressions = new HashSet<>();
|
||||
fileSizeExpressions.add("<51");
|
||||
Set<String> modificationTimeExpressions = new HashSet<>();
|
||||
modificationTimeExpressions.add("<1000");
|
||||
request.setAppId(app.toString());
|
||||
request.setContainerId(attemptContainer.toString());
|
||||
request.setFileName(String.format("%s.*", SMALL_FILE_NAME));
|
||||
request.setFileSize(fileSizeExpressions);
|
||||
request.setModificationTime(modificationTimeExpressions);
|
||||
request.setNodeId(TEST_NODE);
|
||||
request.setUser("TEST");
|
||||
|
||||
LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
|
||||
request.build(), new YarnConfiguration());
|
||||
List<ContainerLogMeta> res = collector.collect(fileController);
|
||||
|
||||
List<ContainerLogFileInfo> allFile = res.stream()
|
||||
.flatMap(m -> m.getContainerLogMeta().stream())
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(1, allFile.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleNodeRequest() throws IOException {
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
request.setAppId(null);
|
||||
request.setContainerId(null);
|
||||
request.setFileName(null);
|
||||
request.setFileSize(null);
|
||||
request.setModificationTime(null);
|
||||
request.setNodeId(TEST_NODE);
|
||||
request.setUser(null);
|
||||
|
||||
LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
|
||||
request.build(), new YarnConfiguration());
|
||||
List<ContainerLogMeta> res = collector.collect(fileController);
|
||||
|
||||
List<ContainerLogFileInfo> allFile = res.stream()
|
||||
.flatMap(m -> m.getContainerLogMeta().stream())
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(4, allFile.stream().
|
||||
filter(f -> f.getFileName().contains(TEST_NODE)).count());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleNodeRegexRequest() throws IOException {
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
request.setAppId(null);
|
||||
request.setContainerId(null);
|
||||
request.setFileName(null);
|
||||
request.setFileSize(null);
|
||||
request.setModificationTime(null);
|
||||
request.setNodeId("TEST_NODE_.*");
|
||||
request.setUser(null);
|
||||
|
||||
LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
|
||||
request.build(), new YarnConfiguration());
|
||||
List<ContainerLogMeta> res = collector.collect(fileController);
|
||||
|
||||
List<ContainerLogFileInfo> allFile = res.stream()
|
||||
.flatMap(m -> m.getContainerLogMeta().stream())
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(8, allFile.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleFileRegex() throws IOException {
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
request.setAppId(null);
|
||||
request.setContainerId(null);
|
||||
request.setFileName(String.format("%s.*", BIG_FILE_NAME));
|
||||
request.setFileSize(null);
|
||||
request.setModificationTime(null);
|
||||
request.setNodeId(null);
|
||||
request.setUser(null);
|
||||
|
||||
LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
|
||||
request.build(), new YarnConfiguration());
|
||||
List<ContainerLogMeta> res = collector.collect(fileController);
|
||||
|
||||
List<ContainerLogFileInfo> allFile = res.stream()
|
||||
.flatMap(m -> m.getContainerLogMeta().stream())
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(4, allFile.size());
|
||||
assertTrue(allFile.stream().allMatch(
|
||||
f -> f.getFileName().contains(BIG_FILE_NAME)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerIdExactMatch() throws IOException {
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
request.setAppId(null);
|
||||
request.setContainerId(attemptContainer.toString());
|
||||
request.setFileName(null);
|
||||
request.setFileSize(null);
|
||||
request.setModificationTime(null);
|
||||
request.setNodeId(null);
|
||||
request.setUser(null);
|
||||
|
||||
LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
|
||||
request.build(), new YarnConfiguration());
|
||||
List<ContainerLogMeta> res = collector.collect(fileController);
|
||||
|
||||
List<ContainerLogFileInfo> allFile = res.stream()
|
||||
.flatMap(m -> m.getContainerLogMeta().stream())
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(2, allFile.size());
|
||||
assertTrue(allFile.stream().allMatch(
|
||||
f -> f.getFileName().contains(attemptContainer.toString())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleFileBetweenSize() throws IOException {
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
Set<String> fileSizeExpressions = new HashSet<>();
|
||||
fileSizeExpressions.add(">50");
|
||||
fileSizeExpressions.add("<101");
|
||||
request.setAppId(null);
|
||||
request.setContainerId(null);
|
||||
request.setFileName(null);
|
||||
request.setFileSize(fileSizeExpressions);
|
||||
request.setModificationTime(null);
|
||||
request.setNodeId(null);
|
||||
request.setUser(null);
|
||||
|
||||
LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
|
||||
request.build(), new YarnConfiguration());
|
||||
List<ContainerLogMeta> res = collector.collect(fileController);
|
||||
|
||||
List<ContainerLogFileInfo> allFile = res.stream()
|
||||
.flatMap(m -> m.getContainerLogMeta().stream())
|
||||
.collect(Collectors.toList());
|
||||
assertEquals(4, allFile.size());
|
||||
assertTrue(allFile.stream().allMatch(
|
||||
f -> f.getFileSize().equals("100")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidQueryStrings() throws IOException {
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder();
|
||||
Set<String> fileSizeExpressions = new HashSet<>();
|
||||
fileSizeExpressions.add("50");
|
||||
fileSizeExpressions.add("101");
|
||||
try {
|
||||
request.setFileName("*");
|
||||
fail("An error should be thrown due to an invalid regex");
|
||||
} catch (IllegalArgumentException ignored) {
|
||||
}
|
||||
|
||||
try {
|
||||
request.setFileSize(fileSizeExpressions);
|
||||
fail("An error should be thrown due to multiple exact match expression");
|
||||
} catch (IllegalArgumentException ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
private FakeNodeFileController createFileController() {
|
||||
FileStatus appDir = new FileStatus();
|
||||
appDir.setPath(new Path(String.format("test/%s", app.toString())));
|
||||
FileStatus appDir2 = new FileStatus();
|
||||
appDir2.setPath(new Path(String.format("test/%s", app2.toString())));
|
||||
List<FileStatus> appDirs = new ArrayList<>();
|
||||
appDirs.add(appDir);
|
||||
appDirs.add(appDir2);
|
||||
|
||||
FileStatus nodeFile = new FileStatus();
|
||||
nodeFile.setPath(new Path(String.format("test/%s", TEST_NODE)));
|
||||
FileStatus nodeFile2 = new FileStatus();
|
||||
nodeFile2.setPath(new Path(String.format("test/%s", TEST_NODE_2)));
|
||||
List<FileStatus> nodeFiles = new ArrayList<>();
|
||||
nodeFiles.add(nodeFile);
|
||||
nodeFiles.add(nodeFile2);
|
||||
|
||||
Map<ImmutablePair<String, String>, Map<String,
|
||||
List<ContainerLogFileInfo>>> internal = new HashMap<>();
|
||||
internal.put(new ImmutablePair<>(app.toString(), TEST_NODE),
|
||||
createLogFiles(TEST_NODE, attemptContainer));
|
||||
internal.put(new ImmutablePair<>(app.toString(), TEST_NODE_2),
|
||||
createLogFiles(TEST_NODE_2, attemptContainer2));
|
||||
internal.put(new ImmutablePair<>(app2.toString(), TEST_NODE),
|
||||
createLogFiles(TEST_NODE, attempt2Container));
|
||||
internal.put(new ImmutablePair<>(app2.toString(), TEST_NODE_2),
|
||||
createLogFiles(TEST_NODE_2, attempt2Container2));
|
||||
return new FakeNodeFileController(internal, appDirs, nodeFiles);
|
||||
}
|
||||
|
||||
private Map<String, List<ContainerLogFileInfo>> createLogFiles(
|
||||
String nodeId, ContainerId... containerId) {
|
||||
Map<String, List<ContainerLogFileInfo>> logFiles = new HashMap<>();
|
||||
for (ContainerId c : containerId) {
|
||||
|
||||
List<ContainerLogFileInfo> files = new ArrayList<>();
|
||||
ContainerLogFileInfo bigFile = new ContainerLogFileInfo();
|
||||
bigFile.setFileName(generateFileName(
|
||||
BIG_FILE_NAME, nodeId, c.toString()));
|
||||
bigFile.setFileSize("100");
|
||||
bigFile.setLastModifiedTime("1000");
|
||||
ContainerLogFileInfo smallFile = new ContainerLogFileInfo();
|
||||
smallFile.setFileName(generateFileName(
|
||||
SMALL_FILE_NAME, nodeId, c.toString()));
|
||||
smallFile.setFileSize("50");
|
||||
smallFile.setLastModifiedTime("100");
|
||||
files.add(bigFile);
|
||||
files.add(smallFile);
|
||||
|
||||
logFiles.put(c.toString(), files);
|
||||
}
|
||||
return logFiles;
|
||||
}
|
||||
|
||||
private String generateFileName(
|
||||
String name, String nodeId, String containerId) {
|
||||
return String.format("%s_%s_%s", name, nodeId, containerId);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.webapp.View;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class FakeLogAggregationFileController
|
||||
extends LogAggregationFileController {
|
||||
|
||||
@Override
|
||||
protected void initInternal(Configuration conf) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeWriter(LogAggregationFileControllerContext context)
|
||||
throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closeWriter() throws LogAggregationDFSException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(AggregatedLogFormat.LogKey logKey,
|
||||
AggregatedLogFormat.LogValue logValue) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postWrite(LogAggregationFileControllerContext record)
|
||||
throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
|
||||
OutputStream os) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerLogMeta> readAggregatedLogsMeta(
|
||||
ContainerLogsRequest logRequest) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renderAggregatedLogsBlock(HtmlBlock.Block html,
|
||||
View.ViewContext context) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getApplicationOwner(Path aggregatedLogPath,
|
||||
ApplicationId appId) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationAccessType, String> getApplicationAcls(
|
||||
Path aggregatedLogPath, ApplicationId appId) throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
||||
|
@ -497,4 +498,109 @@ public class TestLogAggregationIndexedFileController
|
|||
fileFormat.initialize(conf, fileControllerName);
|
||||
assertThat(fileFormat.getRollOverLogMaxSize(conf)).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetLogMetaFilesOfNode() throws Exception {
|
||||
if (fs.exists(rootLocalLogDirPath)) {
|
||||
fs.delete(rootLocalLogDirPath, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(rootLocalLogDirPath));
|
||||
|
||||
Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
|
||||
List<String> logTypes = new ArrayList<String>();
|
||||
logTypes.add("syslog");
|
||||
logTypes.add("stdout");
|
||||
logTypes.add("stderr");
|
||||
|
||||
Set<File> files = new HashSet<>();
|
||||
|
||||
LogKey key1 = new LogKey(containerId.toString());
|
||||
|
||||
for(String logType : logTypes) {
|
||||
File file = createAndWriteLocalLogFile(containerId, appLogsDir,
|
||||
logType);
|
||||
files.add(file);
|
||||
}
|
||||
files.add(createZeroLocalLogFile(appLogsDir));
|
||||
|
||||
LogValue value = mock(LogValue.class);
|
||||
when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);
|
||||
|
||||
LogAggregationIndexedFileController fileFormat =
|
||||
new LogAggregationIndexedFileController();
|
||||
|
||||
fileFormat.initialize(getConf(), "Indexed");
|
||||
|
||||
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
|
||||
Path appDir = fileFormat.getRemoteAppLogDir(appId,
|
||||
USER_UGI.getShortUserName());
|
||||
if (fs.exists(appDir)) {
|
||||
fs.delete(appDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appDir));
|
||||
|
||||
Path logPath = fileFormat.getRemoteNodeLogFileForApp(
|
||||
appId, USER_UGI.getShortUserName(), nodeId);
|
||||
LogAggregationFileControllerContext context =
|
||||
new LogAggregationFileControllerContext(
|
||||
logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI);
|
||||
// initialize the writer
|
||||
fileFormat.initializeWriter(context);
|
||||
|
||||
fileFormat.write(key1, value);
|
||||
fileFormat.postWrite(context);
|
||||
fileFormat.closeWriter();
|
||||
|
||||
ContainerLogsRequest logRequest = new ContainerLogsRequest();
|
||||
logRequest.setAppId(appId);
|
||||
logRequest.setNodeId(nodeId.toString());
|
||||
logRequest.setAppOwner(USER_UGI.getShortUserName());
|
||||
logRequest.setContainerId(containerId.toString());
|
||||
logRequest.setBytes(Long.MAX_VALUE);
|
||||
// create a checksum file
|
||||
final ControlledClock clock = new ControlledClock();
|
||||
clock.setTime(System.currentTimeMillis());
|
||||
Path checksumFile = new Path(fileFormat.getRemoteAppLogDir(
|
||||
appId, USER_UGI.getShortUserName()),
|
||||
LogAggregationUtils.getNodeString(nodeId)
|
||||
+ LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
|
||||
FSDataOutputStream fInput = null;
|
||||
try {
|
||||
String nodeName = logPath.getName() + "_" + clock.getTime();
|
||||
fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
|
||||
fInput.writeInt(nodeName.length());
|
||||
fInput.write(nodeName.getBytes(
|
||||
Charset.forName("UTF-8")));
|
||||
fInput.writeLong(0);
|
||||
} finally {
|
||||
IOUtils.closeQuietly(fInput);
|
||||
}
|
||||
|
||||
Path nodePath = LogAggregationUtils.getRemoteAppLogDir(
|
||||
fileFormat.getRemoteRootLogDir(), appId, USER_UGI.getShortUserName(),
|
||||
fileFormat.getRemoteRootLogDirSuffix());
|
||||
FileStatus[] nodes = fs.listStatus(nodePath);
|
||||
ExtendedLogMetaRequest req =
|
||||
new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder().build();
|
||||
for (FileStatus node : nodes) {
|
||||
Map<String, List<ContainerLogFileInfo>> metas =
|
||||
fileFormat.getLogMetaFilesOfNode(req, node, appId);
|
||||
|
||||
if (node.getPath().getName().contains(
|
||||
LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX)) {
|
||||
assertTrue("Checksum node files should not contain any logs",
|
||||
metas.isEmpty());
|
||||
} else {
|
||||
assertFalse("Non-checksum node files should contain log files",
|
||||
metas.isEmpty());
|
||||
assertEquals(4, metas.values().stream().findFirst().get().size());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,8 +31,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
|
||||
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationMetaCollector;
|
||||
import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.RemoteLogPathEntry;
|
||||
|
@ -264,6 +266,36 @@ public class LogServlet extends Configured {
|
|||
redirectedFromNode, null, manualRedirection);
|
||||
}
|
||||
|
||||
public Response getContainerLogsInfo(
|
||||
HttpServletRequest req,
|
||||
ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder logsRequest)
|
||||
throws IOException {
|
||||
List<ContainerLogMeta> logs = new ArrayList<>();
|
||||
|
||||
if (!logsRequest.isUserSet()) {
|
||||
logsRequest.setUser(UserGroupInformation.getCurrentUser().getUserName());
|
||||
}
|
||||
LogAggregationMetaCollector collector = new LogAggregationMetaCollector(
|
||||
logsRequest.build(), getConf());
|
||||
|
||||
for (LogAggregationFileController fc : getOrCreateFactory()
|
||||
.getConfiguredLogAggregationFileControllerList()) {
|
||||
logs.addAll(collector.collect(fc));
|
||||
}
|
||||
|
||||
List<ContainerLogsInfo> containersLogsInfo = convertToContainerLogsInfo(
|
||||
logs, false);
|
||||
GenericEntity<List<ContainerLogsInfo>> meta =
|
||||
new GenericEntity<List<ContainerLogsInfo>>(containersLogsInfo) {
|
||||
};
|
||||
Response.ResponseBuilder response = Response.ok(meta);
|
||||
// Sending the X-Content-Type-Options response header with the value
|
||||
// nosniff will prevent Internet Explorer from MIME-sniffing a response
|
||||
// away from the declared content-type.
|
||||
response.header("X-Content-Type-Options", "nosniff");
|
||||
return response.build();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns information about the logs for a specific container.
|
||||
|
|
|
@ -40,4 +40,6 @@ public interface YarnWebServiceParams {
|
|||
String CLUSTER_ID = "clusterid";
|
||||
String MANUAL_REDIRECTION = "manual_redirection";
|
||||
String REMOTE_USER = "user";
|
||||
String FILESIZE = "file_size";
|
||||
String MODIFICATION_TIME = "modification_time";
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue