Revert "YARN-7891. LogAggregationIndexedFileController should support read from HAR file. (Xuan Gong via wangda)"
This reverts commit 4d53ef7eef
.
This commit is contained in:
parent
19ae4429f9
commit
e718ac597f
|
@ -249,10 +249,6 @@
|
||||||
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
|
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
|
||||||
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
|
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
|
||||||
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
|
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
|
||||||
<exclude>src/test/resources/application_123456_0001.har/_index</exclude>
|
|
||||||
<exclude>src/test/resources/application_123456_0001.har/part-0</exclude>
|
|
||||||
<exclude>src/test/resources/application_123456_0001.har/_masterindex</exclude>
|
|
||||||
<exclude>src/test/resources/application_123456_0001.har/_SUCCESS</exclude>
|
|
||||||
</excludes>
|
</excludes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
|
@ -495,21 +495,16 @@ public class LogAggregationIndexedFileController
|
||||||
boolean getAllContainers = (containerIdStr == null
|
boolean getAllContainers = (containerIdStr == null
|
||||||
|| containerIdStr.isEmpty());
|
|| containerIdStr.isEmpty());
|
||||||
long size = logRequest.getBytes();
|
long size = logRequest.getBytes();
|
||||||
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
List<FileStatus> nodeFiles = LogAggregationUtils
|
||||||
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
|
.getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(),
|
||||||
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
|
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
|
||||||
if (!nodeFiles.hasNext()) {
|
if (nodeFiles.isEmpty()) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log fils for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
|
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
|
||||||
if (allFiles.isEmpty()) {
|
|
||||||
throw new IOException("There is no available log fils for "
|
|
||||||
+ "application:" + appId);
|
|
||||||
}
|
|
||||||
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
|
|
||||||
List<FileStatus> fileToRead = getNodeLogFileToRead(
|
List<FileStatus> fileToRead = getNodeLogFileToRead(
|
||||||
allFiles, nodeIdStr, appId);
|
nodeFiles, nodeIdStr, appId);
|
||||||
byte[] buf = new byte[65535];
|
byte[] buf = new byte[65535];
|
||||||
for (FileStatus thisNodeFile : fileToRead) {
|
for (FileStatus thisNodeFile : fileToRead) {
|
||||||
String nodeName = thisNodeFile.getPath().getName();
|
String nodeName = thisNodeFile.getPath().getName();
|
||||||
|
@ -614,21 +609,16 @@ public class LogAggregationIndexedFileController
|
||||||
containerIdStr.isEmpty());
|
containerIdStr.isEmpty());
|
||||||
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
|
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
|
||||||
: LogAggregationUtils.getNodeString(nodeId);
|
: LogAggregationUtils.getNodeString(nodeId);
|
||||||
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
|
List<FileStatus> nodeFiles = LogAggregationUtils
|
||||||
.getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
|
.getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir,
|
||||||
this.remoteRootLogDirSuffix);
|
this.remoteRootLogDirSuffix);
|
||||||
if (!nodeFiles.hasNext()) {
|
if (nodeFiles.isEmpty()) {
|
||||||
throw new IOException("There is no available log fils for "
|
throw new IOException("There is no available log fils for "
|
||||||
+ "application:" + appId);
|
+ "application:" + appId);
|
||||||
}
|
}
|
||||||
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
|
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
|
||||||
if (allFiles.isEmpty()) {
|
|
||||||
throw new IOException("There is no available log fils for "
|
|
||||||
+ "application:" + appId);
|
|
||||||
}
|
|
||||||
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
|
|
||||||
List<FileStatus> fileToRead = getNodeLogFileToRead(
|
List<FileStatus> fileToRead = getNodeLogFileToRead(
|
||||||
allFiles, nodeIdStr, appId);
|
nodeFiles, nodeIdStr, appId);
|
||||||
for(FileStatus thisNodeFile : fileToRead) {
|
for(FileStatus thisNodeFile : fileToRead) {
|
||||||
try {
|
try {
|
||||||
Long checkSumIndex = checkSumFiles.get(
|
Long checkSumIndex = checkSumFiles.get(
|
||||||
|
@ -737,33 +727,21 @@ public class LogAggregationIndexedFileController
|
||||||
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
|
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<FileStatus> listOfFiles = new ArrayList<>();
|
List<FileStatus> listOfFiles = new ArrayList<>();
|
||||||
for (FileStatus thisNodeFile : nodeFiles) {
|
List<FileStatus> files = new ArrayList<>(nodeFiles);
|
||||||
String nodeName = thisNodeFile.getPath().getName();
|
for (FileStatus file : files) {
|
||||||
|
String nodeName = file.getPath().getName();
|
||||||
if ((nodeId == null || nodeId.isEmpty()
|
if ((nodeId == null || nodeId.isEmpty()
|
||||||
|| nodeName.contains(LogAggregationUtils
|
|| nodeName.contains(LogAggregationUtils
|
||||||
.getNodeString(nodeId))) && !nodeName.endsWith(
|
.getNodeString(nodeId))) && !nodeName.endsWith(
|
||||||
LogAggregationUtils.TMP_FILE_SUFFIX) &&
|
LogAggregationUtils.TMP_FILE_SUFFIX) &&
|
||||||
!nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
|
!nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
|
||||||
listOfFiles.add(thisNodeFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return listOfFiles;
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<FileStatus> getAllNodeFiles(
|
|
||||||
RemoteIterator<FileStatus> nodeFiles, ApplicationId appId)
|
|
||||||
throws IOException {
|
|
||||||
List<FileStatus> listOfFiles = new ArrayList<>();
|
|
||||||
while (nodeFiles != null && nodeFiles.hasNext()) {
|
|
||||||
FileStatus thisNodeFile = nodeFiles.next();
|
|
||||||
String nodeName = thisNodeFile.getPath().getName();
|
|
||||||
if (nodeName.equals(appId + ".har")) {
|
if (nodeName.equals(appId + ".har")) {
|
||||||
Path p = new Path("har:///"
|
Path p = new Path("har:///" + file.getPath().toUri().getRawPath());
|
||||||
+ thisNodeFile.getPath().toUri().getRawPath());
|
files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p));
|
||||||
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
listOfFiles.add(thisNodeFile);
|
listOfFiles.add(file);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return listOfFiles;
|
return listOfFiles;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -28,7 +27,6 @@ import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.net.URL;
|
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -366,58 +364,6 @@ public class TestLogAggregationIndexFileController {
|
||||||
sysOutStream.reset();
|
sysOutStream.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 15000)
|
|
||||||
public void testFetchApplictionLogsHar() throws Exception {
|
|
||||||
List<String> newLogTypes = new ArrayList<>();
|
|
||||||
newLogTypes.add("syslog");
|
|
||||||
newLogTypes.add("stdout");
|
|
||||||
newLogTypes.add("stderr");
|
|
||||||
newLogTypes.add("test1");
|
|
||||||
newLogTypes.add("test2");
|
|
||||||
URL harUrl = ClassLoader.getSystemClassLoader()
|
|
||||||
.getResource("application_123456_0001.har");
|
|
||||||
assertNotNull(harUrl);
|
|
||||||
|
|
||||||
Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName()
|
|
||||||
+ "/logs/application_123456_0001");
|
|
||||||
if (fs.exists(path)) {
|
|
||||||
fs.delete(path, true);
|
|
||||||
}
|
|
||||||
assertTrue(fs.mkdirs(path));
|
|
||||||
Path harPath = new Path(path, "application_123456_0001.har");
|
|
||||||
fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath);
|
|
||||||
assertTrue(fs.exists(harPath));
|
|
||||||
LogAggregationIndexedFileController fileFormat
|
|
||||||
= new LogAggregationIndexedFileController();
|
|
||||||
fileFormat.initialize(conf, "Indexed");
|
|
||||||
ContainerLogsRequest logRequest = new ContainerLogsRequest();
|
|
||||||
logRequest.setAppId(appId);
|
|
||||||
logRequest.setNodeId(nodeId.toString());
|
|
||||||
logRequest.setAppOwner(USER_UGI.getShortUserName());
|
|
||||||
logRequest.setContainerId(containerId.toString());
|
|
||||||
logRequest.setBytes(Long.MAX_VALUE);
|
|
||||||
List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
|
|
||||||
logRequest);
|
|
||||||
Assert.assertEquals(meta.size(), 3);
|
|
||||||
List<String> fileNames = new ArrayList<>();
|
|
||||||
for (ContainerLogMeta log : meta) {
|
|
||||||
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
|
||||||
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
|
||||||
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
|
||||||
fileNames.add(file.getFileName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fileNames.removeAll(newLogTypes);
|
|
||||||
Assert.assertTrue(fileNames.isEmpty());
|
|
||||||
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
|
||||||
Assert.assertTrue(foundLogs);
|
|
||||||
for (String logType : newLogTypes) {
|
|
||||||
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
|
||||||
containerId, logType)));
|
|
||||||
}
|
|
||||||
sysOutStream.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
private File createAndWriteLocalLogFile(ContainerId containerId,
|
private File createAndWriteLocalLogFile(ContainerId containerId,
|
||||||
Path localLogDir, String logType) throws IOException {
|
Path localLogDir, String logType) throws IOException {
|
||||||
File file = new File(localLogDir.toString(), logType);
|
File file = new File(localLogDir.toString(), logType);
|
||||||
|
|
|
@ -1,3 +0,0 @@
|
||||||
%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 localhost_9999_1517727668513
|
|
||||||
%2Flocalhost_9999_1517727665265 file part-0 0 2895 1517728301581+420+xuan+supergroup
|
|
||||||
%2Flocalhost_9999_1517727668513 file part-0 2895 1228 1517728311919+420+xuan+supergroup
|
|
|
@ -1,2 +0,0 @@
|
||||||
3
|
|
||||||
0 1897968749 0 280
|
|
Binary file not shown.
|
@ -112,9 +112,4 @@ public abstract class RegisterNodeManagerRequest {
|
||||||
* @param physicalResource Physical resources in the node.
|
* @param physicalResource Physical resources in the node.
|
||||||
*/
|
*/
|
||||||
public abstract void setPhysicalResource(Resource physicalResource);
|
public abstract void setPhysicalResource(Resource physicalResource);
|
||||||
|
|
||||||
public abstract List<LogAggregationReport> getLogAggregationReportsForApps();
|
|
||||||
|
|
||||||
public abstract void setLogAggregationReportsForApps(
|
|
||||||
List<LogAggregationReport> logAggregationReportsForApps);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,13 +38,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
|
|
||||||
|
@ -59,8 +57,6 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
private List<ApplicationId> runningApplications = null;
|
private List<ApplicationId> runningApplications = null;
|
||||||
private Set<NodeLabel> labels = null;
|
private Set<NodeLabel> labels = null;
|
||||||
|
|
||||||
private List<LogAggregationReport> logAggregationReportsForApps = null;
|
|
||||||
|
|
||||||
/** Physical resources in the node. */
|
/** Physical resources in the node. */
|
||||||
private Resource physicalResource = null;
|
private Resource physicalResource = null;
|
||||||
|
|
||||||
|
@ -104,48 +100,6 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
if (this.physicalResource != null) {
|
if (this.physicalResource != null) {
|
||||||
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
|
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
|
||||||
}
|
}
|
||||||
if (this.logAggregationReportsForApps != null) {
|
|
||||||
addLogAggregationStatusForAppsToProto();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addLogAggregationStatusForAppsToProto() {
|
|
||||||
maybeInitBuilder();
|
|
||||||
builder.clearLogAggregationReportsForApps();
|
|
||||||
if (this.logAggregationReportsForApps == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Iterable<LogAggregationReportProto> it =
|
|
||||||
new Iterable<LogAggregationReportProto>() {
|
|
||||||
@Override
|
|
||||||
public Iterator<LogAggregationReportProto> iterator() {
|
|
||||||
return new Iterator<LogAggregationReportProto>() {
|
|
||||||
private Iterator<LogAggregationReport> iter =
|
|
||||||
logAggregationReportsForApps.iterator();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() {
|
|
||||||
return iter.hasNext();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public LogAggregationReportProto next() {
|
|
||||||
return convertToProtoFormat(iter.next());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void remove() {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
};
|
|
||||||
builder.addAllLogAggregationReportsForApps(it);
|
|
||||||
}
|
|
||||||
|
|
||||||
private LogAggregationReportProto convertToProtoFormat(
|
|
||||||
LogAggregationReport value) {
|
|
||||||
return ((LogAggregationReportPBImpl) value).getProto();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addNMContainerStatusesToProto() {
|
private synchronized void addNMContainerStatusesToProto() {
|
||||||
|
@ -446,37 +400,4 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
NMContainerStatus c) {
|
NMContainerStatus c) {
|
||||||
return ((NMContainerStatusPBImpl)c).getProto();
|
return ((NMContainerStatusPBImpl)c).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<LogAggregationReport> getLogAggregationReportsForApps() {
|
|
||||||
if (this.logAggregationReportsForApps != null) {
|
|
||||||
return this.logAggregationReportsForApps;
|
|
||||||
}
|
|
||||||
initLogAggregationReportsForApps();
|
|
||||||
return logAggregationReportsForApps;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initLogAggregationReportsForApps() {
|
|
||||||
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
|
||||||
List<LogAggregationReportProto> list =
|
|
||||||
p.getLogAggregationReportsForAppsList();
|
|
||||||
this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>();
|
|
||||||
for (LogAggregationReportProto c : list) {
|
|
||||||
this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private LogAggregationReport convertFromProtoFormat(
|
|
||||||
LogAggregationReportProto logAggregationReport) {
|
|
||||||
return new LogAggregationReportPBImpl(logAggregationReport);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setLogAggregationReportsForApps(
|
|
||||||
List<LogAggregationReport> logAggregationStatusForApps) {
|
|
||||||
if(logAggregationStatusForApps == null) {
|
|
||||||
builder.clearLogAggregationReportsForApps();
|
|
||||||
}
|
|
||||||
this.logAggregationReportsForApps = logAggregationStatusForApps;
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -66,7 +66,6 @@ message RegisterNodeManagerRequestProto {
|
||||||
repeated ApplicationIdProto runningApplications = 7;
|
repeated ApplicationIdProto runningApplications = 7;
|
||||||
optional NodeLabelsProto nodeLabels = 8;
|
optional NodeLabelsProto nodeLabels = 8;
|
||||||
optional ResourceProto physicalResource = 9;
|
optional ResourceProto physicalResource = 9;
|
||||||
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message RegisterNodeManagerResponseProto {
|
message RegisterNodeManagerResponseProto {
|
||||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
@ -121,8 +121,6 @@ public interface Context {
|
||||||
|
|
||||||
NMTimelinePublisher getNMTimelinePublisher();
|
NMTimelinePublisher getNMTimelinePublisher();
|
||||||
|
|
||||||
NMLogAggregationStatusTracker getNMLogAggregationStatusTracker();
|
|
||||||
|
|
||||||
ContainerExecutor getContainerExecutor();
|
ContainerExecutor getContainerExecutor();
|
||||||
|
|
||||||
ContainerStateTransitionListener getContainerStateTransitionListener();
|
ContainerStateTransitionListener getContainerStateTransitionListener();
|
||||||
|
|
|
@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
|
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
|
||||||
|
@ -622,8 +621,6 @@ public class NodeManager extends CompositeService
|
||||||
|
|
||||||
private ResourcePluginManager resourcePluginManager;
|
private ResourcePluginManager resourcePluginManager;
|
||||||
|
|
||||||
private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
|
|
||||||
|
|
||||||
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
NMTokenSecretManagerInNM nmTokenSecretManager,
|
NMTokenSecretManagerInNM nmTokenSecretManager,
|
||||||
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
|
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
|
||||||
|
@ -865,15 +862,6 @@ public class NodeManager extends CompositeService
|
||||||
public void setDeletionService(DeletionService deletionService) {
|
public void setDeletionService(DeletionService deletionService) {
|
||||||
this.deletionService = deletionService;
|
this.deletionService = deletionService;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setNMLogAggregationStatusTracker(
|
|
||||||
NMLogAggregationStatusTracker nmLogAggregationStatusTracker) {
|
|
||||||
this.nmLogAggregationStatusTracker = nmLogAggregationStatusTracker;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
|
|
||||||
return nmLogAggregationStatusTracker;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -381,20 +381,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
if (containerReports != null) {
|
if (containerReports != null) {
|
||||||
LOG.info("Registering with RM using containers :" + containerReports);
|
LOG.info("Registering with RM using containers :" + containerReports);
|
||||||
}
|
}
|
||||||
if (logAggregationEnabled) {
|
|
||||||
// pull log aggregation status for application running in this NM
|
|
||||||
List<LogAggregationReport> logAggregationReports =
|
|
||||||
context.getNMLogAggregationStatusTracker()
|
|
||||||
.pullCachedLogAggregationReports();
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("The cache log aggregation status size:"
|
|
||||||
+ logAggregationReports.size());
|
|
||||||
}
|
|
||||||
if (logAggregationReports != null
|
|
||||||
&& !logAggregationReports.isEmpty()) {
|
|
||||||
request.setLogAggregationReportsForApps(logAggregationReports);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
regNMResponse =
|
regNMResponse =
|
||||||
resourceTracker.registerNodeManager(request);
|
resourceTracker.registerNodeManager(request);
|
||||||
// Make sure rmIdentifier is set before we release the lock
|
// Make sure rmIdentifier is set before we release the lock
|
||||||
|
|
|
@ -109,7 +109,6 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
|
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
|
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
|
||||||
|
@ -139,7 +138,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
|
||||||
|
@ -228,8 +226,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
// NM metrics publisher is set only if the timeline service v.2 is enabled
|
// NM metrics publisher is set only if the timeline service v.2 is enabled
|
||||||
private NMTimelinePublisher nmMetricsPublisher;
|
private NMTimelinePublisher nmMetricsPublisher;
|
||||||
|
|
||||||
private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
|
|
||||||
|
|
||||||
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||||
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
||||||
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
|
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
|
||||||
|
@ -287,10 +283,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
|
|
||||||
addService(dispatcher);
|
addService(dispatcher);
|
||||||
|
|
||||||
this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
|
|
||||||
context);
|
|
||||||
((NMContext)context).setNMLogAggregationStatusTracker(
|
|
||||||
this.nmLogAggregationStatusTracker);
|
|
||||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
this.readLock = lock.readLock();
|
this.readLock = lock.readLock();
|
||||||
this.writeLock = lock.writeLock();
|
this.writeLock = lock.writeLock();
|
||||||
|
@ -566,11 +558,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
return nmTimelinePublisherLocal;
|
return nmTimelinePublisherLocal;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected NMLogAggregationStatusTracker createNMLogAggregationStatusTracker(
|
|
||||||
Context ctxt) {
|
|
||||||
return new NMLogAggregationStatusTracker(ctxt);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ContainersLauncher createContainersLauncher(Context context,
|
protected ContainersLauncher createContainersLauncher(Context context,
|
||||||
ContainerExecutor exec) {
|
ContainerExecutor exec) {
|
||||||
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
|
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
|
||||||
|
@ -666,7 +653,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.nmLogAggregationStatusTracker.start();
|
|
||||||
LOG.info("ContainerManager started at " + connectAddress);
|
LOG.info("ContainerManager started at " + connectAddress);
|
||||||
LOG.info("ContainerManager bound to " + initialAddress);
|
LOG.info("ContainerManager bound to " + initialAddress);
|
||||||
}
|
}
|
||||||
|
@ -705,7 +691,6 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
this.nmLogAggregationStatusTracker.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void cleanUpApplicationsOnNMShutDown() {
|
public void cleanUpApplicationsOnNMShutDown() {
|
||||||
|
|
|
@ -385,8 +385,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
logAggregationSucceedInThisCycle
|
logAggregationSucceedInThisCycle
|
||||||
? LogAggregationStatus.RUNNING
|
? LogAggregationStatus.RUNNING
|
||||||
: LogAggregationStatus.RUNNING_WITH_FAILURE;
|
: LogAggregationStatus.RUNNING_WITH_FAILURE;
|
||||||
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage,
|
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
|
||||||
false);
|
|
||||||
if (appFinished) {
|
if (appFinished) {
|
||||||
// If the app is finished, one extra final report with log aggregation
|
// If the app is finished, one extra final report with log aggregation
|
||||||
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
|
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
|
||||||
|
@ -395,22 +394,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
|
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
|
||||||
? LogAggregationStatus.FAILED
|
? LogAggregationStatus.FAILED
|
||||||
: LogAggregationStatus.SUCCEEDED;
|
: LogAggregationStatus.SUCCEEDED;
|
||||||
sendLogAggregationReportInternal(finalLogAggregationStatus, "", true);
|
sendLogAggregationReportInternal(finalLogAggregationStatus, "");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendLogAggregationReportInternal(
|
private void sendLogAggregationReportInternal(
|
||||||
LogAggregationStatus logAggregationStatus, String diagnosticMessage,
|
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
|
||||||
boolean finalized) {
|
|
||||||
LogAggregationReport report =
|
LogAggregationReport report =
|
||||||
Records.newRecord(LogAggregationReport.class);
|
Records.newRecord(LogAggregationReport.class);
|
||||||
report.setApplicationId(appId);
|
report.setApplicationId(appId);
|
||||||
report.setDiagnosticMessage(diagnosticMessage);
|
report.setDiagnosticMessage(diagnosticMessage);
|
||||||
report.setLogAggregationStatus(logAggregationStatus);
|
report.setLogAggregationStatus(logAggregationStatus);
|
||||||
this.context.getLogAggregationStatusForApps().add(report);
|
this.context.getLogAggregationStatusForApps().add(report);
|
||||||
this.context.getNMLogAggregationStatusTracker().updateLogAggregationStatus(
|
|
||||||
appId, logAggregationStatus, System.currentTimeMillis(),
|
|
||||||
diagnosticMessage, finalized);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -1,244 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Timer;
|
|
||||||
import java.util.TimerTask;
|
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class NMLogAggregationStatusTracker {
|
|
||||||
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(NMLogAggregationStatusTracker.class);
|
|
||||||
|
|
||||||
private final ReadLock updateLocker;
|
|
||||||
private final WriteLock pullLocker;
|
|
||||||
private final Context nmContext;
|
|
||||||
private final long rollingInterval;
|
|
||||||
private final Timer timer;
|
|
||||||
private final Map<ApplicationId, LogAggregationTrakcer> trackers;
|
|
||||||
private boolean disabled = false;
|
|
||||||
|
|
||||||
public NMLogAggregationStatusTracker(Context context) {
|
|
||||||
this.nmContext = context;
|
|
||||||
Configuration conf = context.getConf();
|
|
||||||
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
|
||||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
|
|
||||||
disabled = true;
|
|
||||||
}
|
|
||||||
this.trackers = new HashMap<>();
|
|
||||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
||||||
this.updateLocker = lock.readLock();
|
|
||||||
this.pullLocker = lock.writeLock();
|
|
||||||
this.timer = new Timer();
|
|
||||||
this.rollingInterval = conf.getLong(
|
|
||||||
YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
|
||||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
|
||||||
LOG.info("the rolling interval seconds for the NodeManager Cached Log "
|
|
||||||
+ "aggregation status is " + (rollingInterval/1000));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start() {
|
|
||||||
if (disabled) {
|
|
||||||
LOG.warn("Log Aggregation is disabled."
|
|
||||||
+ "So is the LogAggregationStatusTracker.");
|
|
||||||
} else {
|
|
||||||
this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(),
|
|
||||||
rollingInterval, rollingInterval);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() {
|
|
||||||
this.timer.cancel();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void updateLogAggregationStatus(ApplicationId appId,
|
|
||||||
LogAggregationStatus logAggregationStatus, long updateTime,
|
|
||||||
String diagnosis, boolean finalized) {
|
|
||||||
if (disabled) {
|
|
||||||
LOG.warn("The log aggregation is diabled. No need to update "
|
|
||||||
+ "the log aggregation status");
|
|
||||||
}
|
|
||||||
this.updateLocker.lock();
|
|
||||||
try {
|
|
||||||
LogAggregationTrakcer tracker = trackers.get(appId);
|
|
||||||
if (tracker == null) {
|
|
||||||
Application application = this.nmContext.getApplications().get(appId);
|
|
||||||
if (application == null) {
|
|
||||||
// the application has already finished or
|
|
||||||
// this application is unknown application.
|
|
||||||
// Check the log aggregation status update time, if the update time is
|
|
||||||
// still in the period of timeout, we add it to the trackers map.
|
|
||||||
// Otherwise, we ignore it.
|
|
||||||
long currentTime = System.currentTimeMillis();
|
|
||||||
if (currentTime - updateTime > rollingInterval) {
|
|
||||||
LOG.warn("Ignore the log aggregation status update request "
|
|
||||||
+ "for the application:" + appId + ". The log aggregation status"
|
|
||||||
+ " update time is " + updateTime + " while the request process "
|
|
||||||
+ "time is " + currentTime + ".");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LogAggregationTrakcer newTracker = new LogAggregationTrakcer(
|
|
||||||
logAggregationStatus, diagnosis);
|
|
||||||
newTracker.setLastModifiedTime(updateTime);
|
|
||||||
newTracker.setFinalized(finalized);
|
|
||||||
trackers.put(appId, newTracker);
|
|
||||||
} else {
|
|
||||||
if (tracker.isFinalized()) {
|
|
||||||
LOG.warn("Ignore the log aggregation status update request "
|
|
||||||
+ "for the application:" + appId + ". The cached log aggregation "
|
|
||||||
+ "status is " + tracker.getLogAggregationStatus() + ".");
|
|
||||||
} else {
|
|
||||||
if (tracker.getLastModifiedTime() > updateTime) {
|
|
||||||
LOG.warn("Ignore the log aggregation status update request "
|
|
||||||
+ "for the application:" + appId + ". The request log "
|
|
||||||
+ "aggregation status update is older than the cached "
|
|
||||||
+ "log aggregation status.");
|
|
||||||
} else {
|
|
||||||
tracker.setLogAggregationStatus(logAggregationStatus);
|
|
||||||
tracker.setDiagnosis(diagnosis);
|
|
||||||
tracker.setLastModifiedTime(updateTime);
|
|
||||||
tracker.setFinalized(finalized);
|
|
||||||
trackers.put(appId, tracker);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
this.updateLocker.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<LogAggregationReport> pullCachedLogAggregationReports() {
|
|
||||||
List<LogAggregationReport> reports = new ArrayList<>();
|
|
||||||
if (disabled) {
|
|
||||||
LOG.warn("The log aggregation is diabled."
|
|
||||||
+ "There is no cached log aggregation status.");
|
|
||||||
return reports;
|
|
||||||
}
|
|
||||||
this.pullLocker.lock();
|
|
||||||
try {
|
|
||||||
for(Entry<ApplicationId, LogAggregationTrakcer> tracker :
|
|
||||||
trackers.entrySet()) {
|
|
||||||
LogAggregationTrakcer current = tracker.getValue();
|
|
||||||
LogAggregationReport report = LogAggregationReport.newInstance(
|
|
||||||
tracker.getKey(), current.getLogAggregationStatus(),
|
|
||||||
current.getDiagnosis());
|
|
||||||
reports.add(report);
|
|
||||||
}
|
|
||||||
return reports;
|
|
||||||
} finally {
|
|
||||||
this.pullLocker.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class LogAggregationStatusRoller extends TimerTask {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
rollLogAggregationStatus();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Private
|
|
||||||
void rollLogAggregationStatus() {
|
|
||||||
this.pullLocker.lock();
|
|
||||||
try {
|
|
||||||
long currentTimeStamp = System.currentTimeMillis();
|
|
||||||
LOG.info("Rolling over the cached log aggregation status.");
|
|
||||||
Iterator<Entry<ApplicationId, LogAggregationTrakcer>> it = trackers
|
|
||||||
.entrySet().iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
Entry<ApplicationId, LogAggregationTrakcer> tracker = it.next();
|
|
||||||
// the application has finished.
|
|
||||||
if (nmContext.getApplications().get(tracker.getKey()) == null) {
|
|
||||||
if (currentTimeStamp - tracker.getValue().getLastModifiedTime()
|
|
||||||
> rollingInterval) {
|
|
||||||
it.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
this.pullLocker.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class LogAggregationTrakcer {
|
|
||||||
private LogAggregationStatus logAggregationStatus;
|
|
||||||
private long lastModifiedTime;
|
|
||||||
private boolean finalized;
|
|
||||||
private String diagnosis;
|
|
||||||
|
|
||||||
public LogAggregationTrakcer(
|
|
||||||
LogAggregationStatus logAggregationStatus, String diagnosis) {
|
|
||||||
this.setLogAggregationStatus(logAggregationStatus);
|
|
||||||
this.setDiagnosis(diagnosis);
|
|
||||||
}
|
|
||||||
|
|
||||||
public LogAggregationStatus getLogAggregationStatus() {
|
|
||||||
return logAggregationStatus;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setLogAggregationStatus(
|
|
||||||
LogAggregationStatus logAggregationStatus) {
|
|
||||||
this.logAggregationStatus = logAggregationStatus;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getLastModifiedTime() {
|
|
||||||
return lastModifiedTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setLastModifiedTime(long lastModifiedTime) {
|
|
||||||
this.lastModifiedTime = lastModifiedTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isFinalized() {
|
|
||||||
return finalized;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setFinalized(boolean finalized) {
|
|
||||||
this.finalized = finalized;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getDiagnosis() {
|
|
||||||
return diagnosis;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDiagnosis(String diagnosis) {
|
|
||||||
this.diagnosis = diagnosis;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
|
||||||
|
@ -815,10 +814,5 @@ public abstract class BaseAMRMProxyTest {
|
||||||
public DeletionService getDeletionService() {
|
public DeletionService getDeletionService() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,124 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker;
|
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class TestNMLogAggregationStatusTracker {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testNMLogAggregationStatusUpdate() {
|
|
||||||
Context mockContext = mock(Context.class);
|
|
||||||
ConcurrentMap<ApplicationId, Application> apps = new ConcurrentHashMap<>();
|
|
||||||
when(mockContext.getApplications()).thenReturn(apps);
|
|
||||||
// the log aggregation is disabled.
|
|
||||||
Configuration conf = new YarnConfiguration();
|
|
||||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
|
|
||||||
when(mockContext.getConf()).thenReturn(conf);
|
|
||||||
NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
|
|
||||||
mockContext);
|
|
||||||
ApplicationId appId0 = ApplicationId.newInstance(0, 0);
|
|
||||||
tracker.updateLogAggregationStatus(appId0,
|
|
||||||
LogAggregationStatus.RUNNING, System.currentTimeMillis(), "", false);
|
|
||||||
List<LogAggregationReport> reports = tracker
|
|
||||||
.pullCachedLogAggregationReports();
|
|
||||||
// we can not get any cached log aggregation status because
|
|
||||||
// the log aggregation is disabled.
|
|
||||||
Assert.assertTrue(reports.isEmpty());
|
|
||||||
|
|
||||||
// enable the log aggregation.
|
|
||||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
|
||||||
when(mockContext.getConf()).thenReturn(conf);
|
|
||||||
tracker = new NMLogAggregationStatusTracker(mockContext);
|
|
||||||
// update the log aggregation status for an un-existed application
|
|
||||||
// the update time is not in the period of timeout.
|
|
||||||
// So, we should not cache the log application status for this
|
|
||||||
// application.
|
|
||||||
appId0 = ApplicationId.newInstance(0, 0);
|
|
||||||
tracker.updateLogAggregationStatus(appId0,
|
|
||||||
LogAggregationStatus.RUNNING,
|
|
||||||
System.currentTimeMillis() - 15 * 60 * 1000, "", false);
|
|
||||||
reports = tracker
|
|
||||||
.pullCachedLogAggregationReports();
|
|
||||||
Assert.assertTrue(reports.isEmpty());
|
|
||||||
|
|
||||||
tracker.updateLogAggregationStatus(appId0,
|
|
||||||
LogAggregationStatus.RUNNING,
|
|
||||||
System.currentTimeMillis() - 60 * 1000, "", false);
|
|
||||||
reports = tracker
|
|
||||||
.pullCachedLogAggregationReports();
|
|
||||||
Assert.assertTrue(reports.size() == 1);
|
|
||||||
Assert.assertTrue(reports.get(0).getLogAggregationStatus()
|
|
||||||
== LogAggregationStatus.RUNNING);
|
|
||||||
|
|
||||||
tracker.updateLogAggregationStatus(appId0,
|
|
||||||
LogAggregationStatus.SUCCEEDED,
|
|
||||||
System.currentTimeMillis() - 1 * 60 * 1000, "", true);
|
|
||||||
reports = tracker
|
|
||||||
.pullCachedLogAggregationReports();
|
|
||||||
Assert.assertTrue(reports.size() == 1);
|
|
||||||
Assert.assertTrue(reports.get(0).getLogAggregationStatus()
|
|
||||||
== LogAggregationStatus.SUCCEEDED);
|
|
||||||
|
|
||||||
// the log aggregation status is finalized. So, we would
|
|
||||||
// ingore the following update
|
|
||||||
tracker.updateLogAggregationStatus(appId0,
|
|
||||||
LogAggregationStatus.FAILED,
|
|
||||||
System.currentTimeMillis() - 1 * 60 * 1000, "", true);
|
|
||||||
reports = tracker
|
|
||||||
.pullCachedLogAggregationReports();
|
|
||||||
Assert.assertTrue(reports.size() == 1);
|
|
||||||
Assert.assertTrue(reports.get(0).getLogAggregationStatus()
|
|
||||||
== LogAggregationStatus.SUCCEEDED);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testLogAggregationStatusRoller() throws InterruptedException {
|
|
||||||
Context mockContext = mock(Context.class);
|
|
||||||
Configuration conf = new YarnConfiguration();
|
|
||||||
conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
|
||||||
10 * 1000);
|
|
||||||
when(mockContext.getConf()).thenReturn(conf);
|
|
||||||
NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
|
|
||||||
mockContext);
|
|
||||||
ApplicationId appId0 = ApplicationId.newInstance(0, 0);
|
|
||||||
tracker.updateLogAggregationStatus(appId0,
|
|
||||||
LogAggregationStatus.RUNNING,
|
|
||||||
System.currentTimeMillis(), "", false);
|
|
||||||
// sleep 10s
|
|
||||||
Thread.sleep(10*1000);
|
|
||||||
// the cache log aggregation status should be deleted.
|
|
||||||
List<LogAggregationReport> reports = tracker
|
|
||||||
.pullCachedLogAggregationReports();
|
|
||||||
Assert.assertTrue(reports.size() == 0);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -399,21 +399,9 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
|
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
|
||||||
if (oldNode == null) {
|
if (oldNode == null) {
|
||||||
RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
|
|
||||||
request.getNMContainerStatuses(),
|
|
||||||
request.getRunningApplications());
|
|
||||||
if (request.getLogAggregationReportsForApps() != null
|
|
||||||
&& !request.getLogAggregationReportsForApps().isEmpty()) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Found the number of previous cached log aggregation "
|
|
||||||
+ "status from nodemanager:" + nodeId + " is :"
|
|
||||||
+ request.getLogAggregationReportsForApps().size());
|
|
||||||
}
|
|
||||||
startEvent.setLogAggregationReportsForApps(request
|
|
||||||
.getLogAggregationReportsForApps());
|
|
||||||
}
|
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
startEvent);
|
new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
|
||||||
|
request.getRunningApplications()));
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Reconnect from the node at: " + host);
|
LOG.info("Reconnect from the node at: " + host);
|
||||||
this.nmLivelinessMonitor.unregister(nodeId);
|
this.nmLivelinessMonitor.unregister(nodeId);
|
||||||
|
@ -438,6 +426,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
this.rmContext.getRMNodes().put(nodeId, rmNode);
|
this.rmContext.getRMNodes().put(nodeId, rmNode);
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
.handle(new RMNodeStartedEvent(nodeId, null, null));
|
.handle(new RMNodeStartedEvent(nodeId, null, null));
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// Reset heartbeat ID since node just restarted.
|
// Reset heartbeat ID since node just restarted.
|
||||||
oldNode.resetLastNodeHeartBeatResponse();
|
oldNode.resetLastNodeHeartBeatResponse();
|
||||||
|
|
|
@ -866,12 +866,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new NodesListManagerEvent(
|
new NodesListManagerEvent(
|
||||||
NodesListManagerEventType.NODE_USABLE, rmNode));
|
NodesListManagerEventType.NODE_USABLE, rmNode));
|
||||||
List<LogAggregationReport> logAggregationReportsForApps =
|
|
||||||
startEvent.getLogAggregationReportsForApps();
|
|
||||||
if (logAggregationReportsForApps != null
|
|
||||||
&& !logAggregationReportsForApps.isEmpty()) {
|
|
||||||
rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,14 +22,12 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
|
|
||||||
public class RMNodeStartedEvent extends RMNodeEvent {
|
public class RMNodeStartedEvent extends RMNodeEvent {
|
||||||
|
|
||||||
private List<NMContainerStatus> containerStatuses;
|
private List<NMContainerStatus> containerStatuses;
|
||||||
private List<ApplicationId> runningApplications;
|
private List<ApplicationId> runningApplications;
|
||||||
private List<LogAggregationReport> logAggregationReportsForApps;
|
|
||||||
|
|
||||||
public RMNodeStartedEvent(NodeId nodeId,
|
public RMNodeStartedEvent(NodeId nodeId,
|
||||||
List<NMContainerStatus> containerReports,
|
List<NMContainerStatus> containerReports,
|
||||||
|
@ -46,13 +44,4 @@ public class RMNodeStartedEvent extends RMNodeEvent {
|
||||||
public List<ApplicationId> getRunningApplications() {
|
public List<ApplicationId> getRunningApplications() {
|
||||||
return runningApplications;
|
return runningApplications;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<LogAggregationReport> getLogAggregationReportsForApps() {
|
|
||||||
return this.logAggregationReportsForApps;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setLogAggregationReportsForApps(
|
|
||||||
List<LogAggregationReport> logAggregationReportsForApps) {
|
|
||||||
this.logAggregationReportsForApps = logAggregationReportsForApps;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue