YARN-7891. LogAggregationIndexedFileController should support read from HAR file. (Xuan Gong via wangda)

Change-Id: Ie16e34039d57df50128c73b37516ad0bc7c9590e
This commit is contained in:
Wangda Tan 2018-03-07 10:40:31 -08:00
parent 88fba00caa
commit 4d53ef7eef
21 changed files with 646 additions and 26 deletions

View File

@ -249,6 +249,10 @@
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
<exclude>src/test/resources/application_123456_0001.har/_index</exclude>
<exclude>src/test/resources/application_123456_0001.har/part-0</exclude>
<exclude>src/test/resources/application_123456_0001.har/_masterindex</exclude>
<exclude>src/test/resources/application_123456_0001.har/_SUCCESS</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -495,16 +495,21 @@ public class LogAggregationIndexedFileController
boolean getAllContainers = (containerIdStr == null
|| containerIdStr.isEmpty());
long size = logRequest.getBytes();
List<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(),
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
if (nodeFiles.isEmpty()) {
if (!nodeFiles.hasNext()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
if (allFiles.isEmpty()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
List<FileStatus> fileToRead = getNodeLogFileToRead(
nodeFiles, nodeIdStr, appId);
allFiles, nodeIdStr, appId);
byte[] buf = new byte[65535];
for (FileStatus thisNodeFile : fileToRead) {
String nodeName = thisNodeFile.getPath().getName();
@ -609,16 +614,21 @@ public class LogAggregationIndexedFileController
containerIdStr.isEmpty());
String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
: LogAggregationUtils.getNodeString(nodeId);
List<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir,
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
this.remoteRootLogDirSuffix);
if (nodeFiles.isEmpty()) {
if (!nodeFiles.hasNext()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
if (allFiles.isEmpty()) {
throw new IOException("There is no available log fils for "
+ "application:" + appId);
}
Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
List<FileStatus> fileToRead = getNodeLogFileToRead(
nodeFiles, nodeIdStr, appId);
allFiles, nodeIdStr, appId);
for(FileStatus thisNodeFile : fileToRead) {
try {
Long checkSumIndex = checkSumFiles.get(
@ -727,25 +737,37 @@ public class LogAggregationIndexedFileController
List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
throws IOException {
List<FileStatus> listOfFiles = new ArrayList<>();
List<FileStatus> files = new ArrayList<>(nodeFiles);
for (FileStatus file : files) {
String nodeName = file.getPath().getName();
for (FileStatus thisNodeFile : nodeFiles) {
String nodeName = thisNodeFile.getPath().getName();
if ((nodeId == null || nodeId.isEmpty()
|| nodeName.contains(LogAggregationUtils
.getNodeString(nodeId))) && !nodeName.endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX) &&
!nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
if (nodeName.equals(appId + ".har")) {
Path p = new Path("har:///" + file.getPath().toUri().getRawPath());
files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p));
continue;
}
listOfFiles.add(file);
listOfFiles.add(thisNodeFile);
}
}
return listOfFiles;
}
private List<FileStatus> getAllNodeFiles(
RemoteIterator<FileStatus> nodeFiles, ApplicationId appId)
throws IOException {
List<FileStatus> listOfFiles = new ArrayList<>();
while (nodeFiles != null && nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
String nodeName = thisNodeFile.getPath().getName();
if (nodeName.equals(appId + ".har")) {
Path p = new Path("har:///"
+ thisNodeFile.getPath().toUri().getRawPath());
nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
continue;
}
listOfFiles.add(thisNodeFile);
}
return listOfFiles;
}
@Private
public FileStatus getAllChecksumFiles(Map<String, FileStatus> fileMap,
String fileName) {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
@ -27,6 +28,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.io.Writer;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
@ -364,6 +366,58 @@ public class TestLogAggregationIndexFileController {
sysOutStream.reset();
}
@Test(timeout = 15000)
public void testFetchApplictionLogsHar() throws Exception {
List<String> newLogTypes = new ArrayList<>();
newLogTypes.add("syslog");
newLogTypes.add("stdout");
newLogTypes.add("stderr");
newLogTypes.add("test1");
newLogTypes.add("test2");
URL harUrl = ClassLoader.getSystemClassLoader()
.getResource("application_123456_0001.har");
assertNotNull(harUrl);
Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName()
+ "/logs/application_123456_0001");
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
Path harPath = new Path(path, "application_123456_0001.har");
fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath);
assertTrue(fs.exists(harPath));
LogAggregationIndexedFileController fileFormat
= new LogAggregationIndexedFileController();
fileFormat.initialize(conf, "Indexed");
ContainerLogsRequest logRequest = new ContainerLogsRequest();
logRequest.setAppId(appId);
logRequest.setNodeId(nodeId.toString());
logRequest.setAppOwner(USER_UGI.getShortUserName());
logRequest.setContainerId(containerId.toString());
logRequest.setBytes(Long.MAX_VALUE);
List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
logRequest);
Assert.assertEquals(meta.size(), 3);
List<String> fileNames = new ArrayList<>();
for (ContainerLogMeta log : meta) {
Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
fileNames.add(file.getFileName());
}
}
fileNames.removeAll(newLogTypes);
Assert.assertTrue(fileNames.isEmpty());
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
Assert.assertTrue(foundLogs);
for (String logType : newLogTypes) {
Assert.assertTrue(sysOutStream.toString().contains(logMessage(
containerId, logType)));
}
sysOutStream.reset();
}
private File createAndWriteLocalLogFile(ContainerId containerId,
Path localLogDir, String logType) throws IOException {
File file = new File(localLogDir.toString(), logType);

View File

@ -0,0 +1,3 @@
%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 localhost_9999_1517727668513
%2Flocalhost_9999_1517727665265 file part-0 0 2895 1517728301581+420+xuan+supergroup
%2Flocalhost_9999_1517727668513 file part-0 2895 1228 1517728311919+420+xuan+supergroup

View File

@ -112,4 +112,9 @@ public abstract class RegisterNodeManagerRequest {
* @param physicalResource Physical resources in the node.
*/
public abstract void setPhysicalResource(Resource physicalResource);
public abstract List<LogAggregationReport> getLogAggregationReportsForApps();
public abstract void setLogAggregationReportsForApps(
List<LogAggregationReport> logAggregationReportsForApps);
}

View File

@ -38,11 +38,13 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@ -57,6 +59,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private List<ApplicationId> runningApplications = null;
private Set<NodeLabel> labels = null;
private List<LogAggregationReport> logAggregationReportsForApps = null;
/** Physical resources in the node. */
private Resource physicalResource = null;
@ -100,6 +104,48 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
if (this.physicalResource != null) {
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
}
if (this.logAggregationReportsForApps != null) {
addLogAggregationStatusForAppsToProto();
}
}
private void addLogAggregationStatusForAppsToProto() {
maybeInitBuilder();
builder.clearLogAggregationReportsForApps();
if (this.logAggregationReportsForApps == null) {
return;
}
Iterable<LogAggregationReportProto> it =
new Iterable<LogAggregationReportProto>() {
@Override
public Iterator<LogAggregationReportProto> iterator() {
return new Iterator<LogAggregationReportProto>() {
private Iterator<LogAggregationReport> iter =
logAggregationReportsForApps.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public LogAggregationReportProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllLogAggregationReportsForApps(it);
}
private LogAggregationReportProto convertToProtoFormat(
LogAggregationReport value) {
return ((LogAggregationReportPBImpl) value).getProto();
}
private synchronized void addNMContainerStatusesToProto() {
@ -400,4 +446,37 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
NMContainerStatus c) {
return ((NMContainerStatusPBImpl)c).getProto();
}
@Override
public List<LogAggregationReport> getLogAggregationReportsForApps() {
if (this.logAggregationReportsForApps != null) {
return this.logAggregationReportsForApps;
}
initLogAggregationReportsForApps();
return logAggregationReportsForApps;
}
private void initLogAggregationReportsForApps() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
List<LogAggregationReportProto> list =
p.getLogAggregationReportsForAppsList();
this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>();
for (LogAggregationReportProto c : list) {
this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
}
}
private LogAggregationReport convertFromProtoFormat(
LogAggregationReportProto logAggregationReport) {
return new LogAggregationReportPBImpl(logAggregationReport);
}
@Override
public void setLogAggregationReportsForApps(
List<LogAggregationReport> logAggregationStatusForApps) {
if(logAggregationStatusForApps == null) {
builder.clearLogAggregationReportsForApps();
}
this.logAggregationReportsForApps = logAggregationStatusForApps;
}
}

View File

@ -66,6 +66,7 @@ message RegisterNodeManagerRequestProto {
repeated ApplicationIdProto runningApplications = 7;
optional NodeLabelsProto nodeLabels = 8;
optional ResourceProto physicalResource = 9;
repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
}
message RegisterNodeManagerResponseProto {

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -121,6 +121,8 @@ public interface Context {
NMTimelinePublisher getNMTimelinePublisher();
NMLogAggregationStatusTracker getNMLogAggregationStatusTracker();
ContainerExecutor getContainerExecutor();
ContainerStateTransitionListener getContainerStateTransitionListener();

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
@ -621,6 +622,8 @@ public class NodeManager extends CompositeService
private ResourcePluginManager resourcePluginManager;
private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@ -862,6 +865,15 @@ public class NodeManager extends CompositeService
public void setDeletionService(DeletionService deletionService) {
this.deletionService = deletionService;
}
public void setNMLogAggregationStatusTracker(
NMLogAggregationStatusTracker nmLogAggregationStatusTracker) {
this.nmLogAggregationStatusTracker = nmLogAggregationStatusTracker;
}
@Override
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
return nmLogAggregationStatusTracker;
}
}
/**

View File

@ -381,6 +381,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
if (logAggregationEnabled) {
// pull log aggregation status for application running in this NM
List<LogAggregationReport> logAggregationReports =
context.getNMLogAggregationStatusTracker()
.pullCachedLogAggregationReports();
if (LOG.isDebugEnabled()) {
LOG.debug("The cache log aggregation status size:"
+ logAggregationReports.size());
}
if (logAggregationReports != null
&& !logAggregationReports.isEmpty()) {
request.setLogAggregationReportsForApps(logAggregationReports);
}
}
regNMResponse =
resourceTracker.registerNodeManager(request);
// Make sure rmIdentifier is set before we release the lock

View File

@ -109,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
@ -138,6 +139,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
@ -226,6 +228,8 @@ public class ContainerManagerImpl extends CompositeService implements
// NM metrics publisher is set only if the timeline service v.2 is enabled
private NMTimelinePublisher nmMetricsPublisher;
private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
@ -283,6 +287,10 @@ public class ContainerManagerImpl extends CompositeService implements
addService(dispatcher);
this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
context);
((NMContext)context).setNMLogAggregationStatusTracker(
this.nmLogAggregationStatusTracker);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@ -558,6 +566,11 @@ public class ContainerManagerImpl extends CompositeService implements
return nmTimelinePublisherLocal;
}
protected NMLogAggregationStatusTracker createNMLogAggregationStatusTracker(
Context ctxt) {
return new NMLogAggregationStatusTracker(ctxt);
}
protected ContainersLauncher createContainersLauncher(Context context,
ContainerExecutor exec) {
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
@ -653,6 +666,7 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
this.nmLogAggregationStatusTracker.start();
LOG.info("ContainerManager started at " + connectAddress);
LOG.info("ContainerManager bound to " + initialAddress);
}
@ -691,6 +705,7 @@ public class ContainerManagerImpl extends CompositeService implements
server.stop();
}
super.serviceStop();
this.nmLogAggregationStatusTracker.stop();
}
public void cleanUpApplicationsOnNMShutDown() {

View File

@ -385,7 +385,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
logAggregationSucceedInThisCycle
? LogAggregationStatus.RUNNING
: LogAggregationStatus.RUNNING_WITH_FAILURE;
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage,
false);
if (appFinished) {
// If the app is finished, one extra final report with log aggregation
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
@ -394,18 +395,22 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
? LogAggregationStatus.FAILED
: LogAggregationStatus.SUCCEEDED;
sendLogAggregationReportInternal(finalLogAggregationStatus, "");
sendLogAggregationReportInternal(finalLogAggregationStatus, "", true);
}
}
private void sendLogAggregationReportInternal(
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
LogAggregationStatus logAggregationStatus, String diagnosticMessage,
boolean finalized) {
LogAggregationReport report =
Records.newRecord(LogAggregationReport.class);
report.setApplicationId(appId);
report.setDiagnosticMessage(diagnosticMessage);
report.setLogAggregationStatus(logAggregationStatus);
this.context.getLogAggregationStatusForApps().add(report);
this.context.getNMLogAggregationStatusTracker().updateLogAggregationStatus(
appId, logAggregationStatus, System.currentTimeMillis(),
diagnosticMessage, finalized);
}
@SuppressWarnings("unchecked")

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NMLogAggregationStatusTracker {
private static final Logger LOG =
LoggerFactory.getLogger(NMLogAggregationStatusTracker.class);
private final ReadLock updateLocker;
private final WriteLock pullLocker;
private final Context nmContext;
private final long rollingInterval;
private final Timer timer;
private final Map<ApplicationId, LogAggregationTrakcer> trackers;
private boolean disabled = false;
public NMLogAggregationStatusTracker(Context context) {
this.nmContext = context;
Configuration conf = context.getConf();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
disabled = true;
}
this.trackers = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.updateLocker = lock.readLock();
this.pullLocker = lock.writeLock();
this.timer = new Timer();
this.rollingInterval = conf.getLong(
YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
LOG.info("the rolling interval seconds for the NodeManager Cached Log "
+ "aggregation status is " + (rollingInterval/1000));
}
public void start() {
if (disabled) {
LOG.warn("Log Aggregation is disabled."
+ "So is the LogAggregationStatusTracker.");
} else {
this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(),
rollingInterval, rollingInterval);
}
}
public void stop() {
this.timer.cancel();
}
public void updateLogAggregationStatus(ApplicationId appId,
LogAggregationStatus logAggregationStatus, long updateTime,
String diagnosis, boolean finalized) {
if (disabled) {
LOG.warn("The log aggregation is diabled. No need to update "
+ "the log aggregation status");
}
this.updateLocker.lock();
try {
LogAggregationTrakcer tracker = trackers.get(appId);
if (tracker == null) {
Application application = this.nmContext.getApplications().get(appId);
if (application == null) {
// the application has already finished or
// this application is unknown application.
// Check the log aggregation status update time, if the update time is
// still in the period of timeout, we add it to the trackers map.
// Otherwise, we ignore it.
long currentTime = System.currentTimeMillis();
if (currentTime - updateTime > rollingInterval) {
LOG.warn("Ignore the log aggregation status update request "
+ "for the application:" + appId + ". The log aggregation status"
+ " update time is " + updateTime + " while the request process "
+ "time is " + currentTime + ".");
return;
}
}
LogAggregationTrakcer newTracker = new LogAggregationTrakcer(
logAggregationStatus, diagnosis);
newTracker.setLastModifiedTime(updateTime);
newTracker.setFinalized(finalized);
trackers.put(appId, newTracker);
} else {
if (tracker.isFinalized()) {
LOG.warn("Ignore the log aggregation status update request "
+ "for the application:" + appId + ". The cached log aggregation "
+ "status is " + tracker.getLogAggregationStatus() + ".");
} else {
if (tracker.getLastModifiedTime() > updateTime) {
LOG.warn("Ignore the log aggregation status update request "
+ "for the application:" + appId + ". The request log "
+ "aggregation status update is older than the cached "
+ "log aggregation status.");
} else {
tracker.setLogAggregationStatus(logAggregationStatus);
tracker.setDiagnosis(diagnosis);
tracker.setLastModifiedTime(updateTime);
tracker.setFinalized(finalized);
trackers.put(appId, tracker);
}
}
}
} finally {
this.updateLocker.unlock();
}
}
public List<LogAggregationReport> pullCachedLogAggregationReports() {
List<LogAggregationReport> reports = new ArrayList<>();
if (disabled) {
LOG.warn("The log aggregation is diabled."
+ "There is no cached log aggregation status.");
return reports;
}
this.pullLocker.lock();
try {
for(Entry<ApplicationId, LogAggregationTrakcer> tracker :
trackers.entrySet()) {
LogAggregationTrakcer current = tracker.getValue();
LogAggregationReport report = LogAggregationReport.newInstance(
tracker.getKey(), current.getLogAggregationStatus(),
current.getDiagnosis());
reports.add(report);
}
return reports;
} finally {
this.pullLocker.unlock();
}
}
private class LogAggregationStatusRoller extends TimerTask {
@Override
public void run() {
rollLogAggregationStatus();
}
}
@Private
void rollLogAggregationStatus() {
this.pullLocker.lock();
try {
long currentTimeStamp = System.currentTimeMillis();
LOG.info("Rolling over the cached log aggregation status.");
Iterator<Entry<ApplicationId, LogAggregationTrakcer>> it = trackers
.entrySet().iterator();
while (it.hasNext()) {
Entry<ApplicationId, LogAggregationTrakcer> tracker = it.next();
// the application has finished.
if (nmContext.getApplications().get(tracker.getKey()) == null) {
if (currentTimeStamp - tracker.getValue().getLastModifiedTime()
> rollingInterval) {
it.remove();
}
}
}
} finally {
this.pullLocker.unlock();
}
}
private static class LogAggregationTrakcer {
private LogAggregationStatus logAggregationStatus;
private long lastModifiedTime;
private boolean finalized;
private String diagnosis;
public LogAggregationTrakcer(
LogAggregationStatus logAggregationStatus, String diagnosis) {
this.setLogAggregationStatus(logAggregationStatus);
this.setDiagnosis(diagnosis);
}
public LogAggregationStatus getLogAggregationStatus() {
return logAggregationStatus;
}
public void setLogAggregationStatus(
LogAggregationStatus logAggregationStatus) {
this.logAggregationStatus = logAggregationStatus;
}
public long getLastModifiedTime() {
return lastModifiedTime;
}
public void setLastModifiedTime(long lastModifiedTime) {
this.lastModifiedTime = lastModifiedTime;
}
public boolean isFinalized() {
return finalized;
}
public void setFinalized(boolean finalized) {
this.finalized = finalized;
}
public String getDiagnosis() {
return diagnosis;
}
public void setDiagnosis(String diagnosis) {
this.diagnosis = diagnosis;
}
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
@ -814,5 +815,10 @@ public abstract class BaseAMRMProxyTest {
public DeletionService getDeletionService() {
return null;
}
@Override
public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
return null;
}
}
}

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.junit.Assert;
import org.junit.Test;
public class TestNMLogAggregationStatusTracker {
@Test
public void testNMLogAggregationStatusUpdate() {
Context mockContext = mock(Context.class);
ConcurrentMap<ApplicationId, Application> apps = new ConcurrentHashMap<>();
when(mockContext.getApplications()).thenReturn(apps);
// the log aggregation is disabled.
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
when(mockContext.getConf()).thenReturn(conf);
NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
mockContext);
ApplicationId appId0 = ApplicationId.newInstance(0, 0);
tracker.updateLogAggregationStatus(appId0,
LogAggregationStatus.RUNNING, System.currentTimeMillis(), "", false);
List<LogAggregationReport> reports = tracker
.pullCachedLogAggregationReports();
// we can not get any cached log aggregation status because
// the log aggregation is disabled.
Assert.assertTrue(reports.isEmpty());
// enable the log aggregation.
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
when(mockContext.getConf()).thenReturn(conf);
tracker = new NMLogAggregationStatusTracker(mockContext);
// update the log aggregation status for an un-existed application
// the update time is not in the period of timeout.
// So, we should not cache the log application status for this
// application.
appId0 = ApplicationId.newInstance(0, 0);
tracker.updateLogAggregationStatus(appId0,
LogAggregationStatus.RUNNING,
System.currentTimeMillis() - 15 * 60 * 1000, "", false);
reports = tracker
.pullCachedLogAggregationReports();
Assert.assertTrue(reports.isEmpty());
tracker.updateLogAggregationStatus(appId0,
LogAggregationStatus.RUNNING,
System.currentTimeMillis() - 60 * 1000, "", false);
reports = tracker
.pullCachedLogAggregationReports();
Assert.assertTrue(reports.size() == 1);
Assert.assertTrue(reports.get(0).getLogAggregationStatus()
== LogAggregationStatus.RUNNING);
tracker.updateLogAggregationStatus(appId0,
LogAggregationStatus.SUCCEEDED,
System.currentTimeMillis() - 1 * 60 * 1000, "", true);
reports = tracker
.pullCachedLogAggregationReports();
Assert.assertTrue(reports.size() == 1);
Assert.assertTrue(reports.get(0).getLogAggregationStatus()
== LogAggregationStatus.SUCCEEDED);
// the log aggregation status is finalized. So, we would
// ingore the following update
tracker.updateLogAggregationStatus(appId0,
LogAggregationStatus.FAILED,
System.currentTimeMillis() - 1 * 60 * 1000, "", true);
reports = tracker
.pullCachedLogAggregationReports();
Assert.assertTrue(reports.size() == 1);
Assert.assertTrue(reports.get(0).getLogAggregationStatus()
== LogAggregationStatus.SUCCEEDED);
}
public void testLogAggregationStatusRoller() throws InterruptedException {
Context mockContext = mock(Context.class);
Configuration conf = new YarnConfiguration();
conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
10 * 1000);
when(mockContext.getConf()).thenReturn(conf);
NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
mockContext);
ApplicationId appId0 = ApplicationId.newInstance(0, 0);
tracker.updateLogAggregationStatus(appId0,
LogAggregationStatus.RUNNING,
System.currentTimeMillis(), "", false);
// sleep 10s
Thread.sleep(10*1000);
// the cache log aggregation status should be deleted.
List<LogAggregationReport> reports = tracker
.pullCachedLogAggregationReports();
Assert.assertTrue(reports.size() == 0);
}
}

View File

@ -399,9 +399,21 @@ public class ResourceTrackerService extends AbstractService implements
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
request.getNMContainerStatuses(),
request.getRunningApplications());
if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found the number of previous cached log aggregation "
+ "status from nodemanager:" + nodeId + " is :"
+ request.getLogAggregationReportsForApps().size());
}
startEvent.setLogAggregationReportsForApps(request
.getLogAggregationReportsForApps());
}
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
request.getRunningApplications()));
startEvent);
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
@ -426,7 +438,6 @@ public class ResourceTrackerService extends AbstractService implements
this.rmContext.getRMNodes().put(nodeId, rmNode);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeStartedEvent(nodeId, null, null));
} else {
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse();

View File

@ -866,6 +866,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
List<LogAggregationReport> logAggregationReportsForApps =
startEvent.getLogAggregationReportsForApps();
if (logAggregationReportsForApps != null
&& !logAggregationReportsForApps.isEmpty()) {
rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
}
}
}

View File

@ -22,12 +22,14 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
public class RMNodeStartedEvent extends RMNodeEvent {
private List<NMContainerStatus> containerStatuses;
private List<ApplicationId> runningApplications;
private List<LogAggregationReport> logAggregationReportsForApps;
public RMNodeStartedEvent(NodeId nodeId,
List<NMContainerStatus> containerReports,
@ -44,4 +46,13 @@ public class RMNodeStartedEvent extends RMNodeEvent {
public List<ApplicationId> getRunningApplications() {
return runningApplications;
}
public List<LogAggregationReport> getLogAggregationReportsForApps() {
return this.logAggregationReportsForApps;
}
public void setLogAggregationReportsForApps(
List<LogAggregationReport> logAggregationReportsForApps) {
this.logAggregationReportsForApps = logAggregationReportsForApps;
}
}