YARN-8586. Extract log aggregation related fields and methods from RMAppImpl. Contributed by Peter Bacsko
This commit is contained in:
parent
8fee3808c5
commit
df616370f0
|
@ -19,18 +19,14 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
|
@ -181,19 +177,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
new AppFinishedTransition();
|
||||
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
|
||||
|
||||
private final boolean logAggregationEnabled;
|
||||
private long logAggregationStartTime = 0;
|
||||
private final long logAggregationStatusTimeout;
|
||||
private final Map<NodeId, LogAggregationReport> logAggregationStatus =
|
||||
new ConcurrentHashMap<NodeId, LogAggregationReport>();
|
||||
private volatile LogAggregationStatus logAggregationStatusForAppReport;
|
||||
private int logAggregationSucceed = 0;
|
||||
private int logAggregationFailed = 0;
|
||||
private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
|
||||
new HashMap<NodeId, List<String>>();
|
||||
private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
|
||||
new HashMap<NodeId, List<String>>();
|
||||
private final int maxLogAggregationDiagnosticsInMemory;
|
||||
private final RMAppLogAggregation logAggregation;
|
||||
private Map<ApplicationTimeoutType, Long> applicationTimeouts =
|
||||
new HashMap<ApplicationTimeoutType, Long>();
|
||||
|
||||
|
@ -510,26 +494,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
applicationSchedulingEnvs
|
||||
.putAll(submissionContext.getApplicationSchedulingPropertiesMap());
|
||||
|
||||
long localLogAggregationStatusTimeout =
|
||||
conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
||||
if (localLogAggregationStatusTimeout <= 0) {
|
||||
this.logAggregationStatusTimeout =
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
|
||||
} else {
|
||||
this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
|
||||
}
|
||||
this.logAggregationEnabled =
|
||||
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
|
||||
if (this.logAggregationEnabled) {
|
||||
this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
|
||||
} else {
|
||||
this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
|
||||
}
|
||||
maxLogAggregationDiagnosticsInMemory = conf.getInt(
|
||||
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
|
||||
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
|
||||
this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock);
|
||||
|
||||
// amBlacklistingEnabled can be configured globally
|
||||
// Just use the global values
|
||||
|
@ -1087,13 +1052,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
// otherwise, add it to ranNodes for further process
|
||||
app.ranNodes.add(nodeAddedEvent.getNodeId());
|
||||
|
||||
if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
|
||||
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
|
||||
LogAggregationReport.newInstance(app.applicationId,
|
||||
app.logAggregationEnabled ? LogAggregationStatus.NOT_START
|
||||
: LogAggregationStatus.DISABLED, ""));
|
||||
}
|
||||
};
|
||||
app.logAggregation.addReportIfNecessary(
|
||||
nodeAddedEvent.getNodeId(), app.getApplicationId());
|
||||
}
|
||||
}
|
||||
|
||||
// synchronously recover attempt to ensure any incoming external events
|
||||
|
@ -1507,7 +1468,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
}
|
||||
|
||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
app.logAggregationStartTime = app.systemClock.getTime();
|
||||
app.logAggregation
|
||||
.recordLogAggregationStartTime(app.systemClock.getTime());
|
||||
for (NodeId nodeId : app.getRanNodes()) {
|
||||
app.handler.handle(
|
||||
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
||||
|
@ -1765,263 +1727,31 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
|
||||
@Override
|
||||
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
|
||||
try {
|
||||
this.readLock.lock();
|
||||
if (!isLogAggregationFinished() && isAppInFinalState(this) &&
|
||||
systemClock.getTime() > this.logAggregationStartTime
|
||||
+ this.logAggregationStatusTimeout) {
|
||||
for (Entry<NodeId, LogAggregationReport> output :
|
||||
logAggregationStatus.entrySet()) {
|
||||
if (!output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.TIME_OUT)
|
||||
&& !output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.SUCCEEDED)
|
||||
&& !output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.FAILED)) {
|
||||
output.getValue().setLogAggregationStatus(
|
||||
LogAggregationStatus.TIME_OUT);
|
||||
}
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableMap(logAggregationStatus);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
return logAggregation.getLogAggregationReportsForApp(this);
|
||||
}
|
||||
|
||||
public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
|
||||
try {
|
||||
this.writeLock.lock();
|
||||
if (this.logAggregationEnabled && !isLogAggregationFinished()) {
|
||||
LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
|
||||
boolean stateChangedToFinal = false;
|
||||
if (curReport == null) {
|
||||
this.logAggregationStatus.put(nodeId, report);
|
||||
if (isLogAggregationFinishedForNM(report)) {
|
||||
stateChangedToFinal = true;
|
||||
}
|
||||
} else {
|
||||
if (isLogAggregationFinishedForNM(report)) {
|
||||
if (!isLogAggregationFinishedForNM(curReport)) {
|
||||
stateChangedToFinal = true;
|
||||
}
|
||||
}
|
||||
if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
|
||||
|| curReport.getLogAggregationStatus() !=
|
||||
LogAggregationStatus.RUNNING_WITH_FAILURE) {
|
||||
if (curReport.getLogAggregationStatus()
|
||||
== LogAggregationStatus.TIME_OUT
|
||||
&& report.getLogAggregationStatus()
|
||||
== LogAggregationStatus.RUNNING) {
|
||||
// If the log aggregation status got from latest nm heartbeat
|
||||
// is Running, and current log aggregation status is TimeOut,
|
||||
// based on whether there are any failure messages for this NM,
|
||||
// we will reset the log aggregation status as RUNNING or
|
||||
// RUNNING_WITH_FAILURE
|
||||
if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
|
||||
!logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
|
||||
report.setLogAggregationStatus(
|
||||
LogAggregationStatus.RUNNING_WITH_FAILURE);
|
||||
}
|
||||
}
|
||||
curReport.setLogAggregationStatus(report
|
||||
.getLogAggregationStatus());
|
||||
}
|
||||
}
|
||||
updateLogAggregationDiagnosticMessages(nodeId, report);
|
||||
if (isAppInFinalState(this) && stateChangedToFinal) {
|
||||
updateLogAggregationStatus(nodeId);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogAggregationStatus getLogAggregationStatusForAppReport() {
|
||||
try {
|
||||
this.readLock.lock();
|
||||
if (! logAggregationEnabled) {
|
||||
return LogAggregationStatus.DISABLED;
|
||||
}
|
||||
if (isLogAggregationFinished()) {
|
||||
return this.logAggregationStatusForAppReport;
|
||||
}
|
||||
Map<NodeId, LogAggregationReport> reports =
|
||||
getLogAggregationReportsForApp();
|
||||
if (reports.size() == 0) {
|
||||
return this.logAggregationStatusForAppReport;
|
||||
}
|
||||
int logNotStartCount = 0;
|
||||
int logCompletedCount = 0;
|
||||
int logTimeOutCount = 0;
|
||||
int logFailedCount = 0;
|
||||
int logRunningWithFailure = 0;
|
||||
for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
|
||||
switch (report.getValue().getLogAggregationStatus()) {
|
||||
case NOT_START:
|
||||
logNotStartCount++;
|
||||
break;
|
||||
case RUNNING_WITH_FAILURE:
|
||||
logRunningWithFailure ++;
|
||||
break;
|
||||
case SUCCEEDED:
|
||||
logCompletedCount++;
|
||||
break;
|
||||
case FAILED:
|
||||
logFailedCount++;
|
||||
logCompletedCount++;
|
||||
break;
|
||||
case TIME_OUT:
|
||||
logTimeOutCount++;
|
||||
logCompletedCount++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (logNotStartCount == reports.size()) {
|
||||
return LogAggregationStatus.NOT_START;
|
||||
} else if (logCompletedCount == reports.size()) {
|
||||
// We should satisfy two condition in order to return SUCCEEDED or FAILED
|
||||
// 1) make sure the application is in final state
|
||||
// 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
|
||||
// The SUCCEEDED/FAILED status is the final status which means
|
||||
// the log aggregation is finished. And the log aggregation status will
|
||||
// not be updated anymore.
|
||||
if (logFailedCount > 0 && isAppInFinalState(this)) {
|
||||
this.logAggregationStatusForAppReport =
|
||||
LogAggregationStatus.FAILED;
|
||||
return LogAggregationStatus.FAILED;
|
||||
} else if (logTimeOutCount > 0) {
|
||||
this.logAggregationStatusForAppReport =
|
||||
LogAggregationStatus.TIME_OUT;
|
||||
return LogAggregationStatus.TIME_OUT;
|
||||
}
|
||||
if (isAppInFinalState(this)) {
|
||||
this.logAggregationStatusForAppReport =
|
||||
LogAggregationStatus.SUCCEEDED;
|
||||
return LogAggregationStatus.SUCCEEDED;
|
||||
}
|
||||
} else if (logRunningWithFailure > 0) {
|
||||
return LogAggregationStatus.RUNNING_WITH_FAILURE;
|
||||
}
|
||||
return LogAggregationStatus.RUNNING;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLogAggregationEnabled() {
|
||||
return logAggregationEnabled;
|
||||
logAggregation.aggregateLogReport(nodeId, report, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLogAggregationFinished() {
|
||||
return this.logAggregationStatusForAppReport
|
||||
.equals(LogAggregationStatus.SUCCEEDED)
|
||||
|| this.logAggregationStatusForAppReport
|
||||
.equals(LogAggregationStatus.FAILED)
|
||||
|| this.logAggregationStatusForAppReport
|
||||
.equals(LogAggregationStatus.TIME_OUT);
|
||||
|
||||
return logAggregation.isFinished();
|
||||
}
|
||||
|
||||
private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
|
||||
return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
|
||||
|| report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
|
||||
}
|
||||
|
||||
private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
|
||||
LogAggregationReport report) {
|
||||
if (report.getDiagnosticMessage() != null
|
||||
&& !report.getDiagnosticMessage().isEmpty()) {
|
||||
if (report.getLogAggregationStatus()
|
||||
== LogAggregationStatus.RUNNING ) {
|
||||
List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
|
||||
if (diagnostics == null) {
|
||||
diagnostics = new ArrayList<String>();
|
||||
logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
|
||||
} else {
|
||||
if (diagnostics.size()
|
||||
== maxLogAggregationDiagnosticsInMemory) {
|
||||
diagnostics.remove(0);
|
||||
}
|
||||
}
|
||||
diagnostics.add(report.getDiagnosticMessage());
|
||||
this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
|
||||
StringUtils.join(diagnostics, "\n"));
|
||||
} else if (report.getLogAggregationStatus()
|
||||
== LogAggregationStatus.RUNNING_WITH_FAILURE) {
|
||||
List<String> failureMessages =
|
||||
logAggregationFailureMessagesForNMs.get(nodeId);
|
||||
if (failureMessages == null) {
|
||||
failureMessages = new ArrayList<String>();
|
||||
logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
|
||||
} else {
|
||||
if (failureMessages.size()
|
||||
== maxLogAggregationDiagnosticsInMemory) {
|
||||
failureMessages.remove(0);
|
||||
}
|
||||
}
|
||||
failureMessages.add(report.getDiagnosticMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateLogAggregationStatus(NodeId nodeId) {
|
||||
LogAggregationStatus status =
|
||||
this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
|
||||
if (status.equals(LogAggregationStatus.SUCCEEDED)) {
|
||||
this.logAggregationSucceed++;
|
||||
} else if (status.equals(LogAggregationStatus.FAILED)) {
|
||||
this.logAggregationFailed++;
|
||||
}
|
||||
if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
|
||||
this.logAggregationStatusForAppReport =
|
||||
LogAggregationStatus.SUCCEEDED;
|
||||
// Since the log aggregation status for this application for all NMs
|
||||
// is SUCCEEDED, it means all logs are aggregated successfully.
|
||||
// We could remove all the cached log aggregation reports
|
||||
this.logAggregationStatus.clear();
|
||||
this.logAggregationDiagnosticsForNMs.clear();
|
||||
this.logAggregationFailureMessagesForNMs.clear();
|
||||
} else if (this.logAggregationSucceed + this.logAggregationFailed
|
||||
== this.logAggregationStatus.size()) {
|
||||
this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
|
||||
// We have collected the log aggregation status for all NMs.
|
||||
// The log aggregation status is FAILED which means the log
|
||||
// aggregation fails in some NMs. We are only interested in the
|
||||
// nodes where the log aggregation is failed. So we could remove
|
||||
// the log aggregation details for those succeeded NMs
|
||||
for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
|
||||
this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
|
||||
Map.Entry<NodeId, LogAggregationReport> entry = it.next();
|
||||
if (entry.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.SUCCEEDED)) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
// the log aggregation has finished/failed.
|
||||
// and the status will not be updated anymore.
|
||||
this.logAggregationDiagnosticsForNMs.clear();
|
||||
}
|
||||
@Override
|
||||
public boolean isLogAggregationEnabled() {
|
||||
return logAggregation.isEnabled();
|
||||
}
|
||||
|
||||
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
|
||||
try {
|
||||
this.readLock.lock();
|
||||
List<String> failureMessages =
|
||||
this.logAggregationFailureMessagesForNMs.get(nodeId);
|
||||
if (failureMessages == null || failureMessages.isEmpty()) {
|
||||
return StringUtils.EMPTY;
|
||||
}
|
||||
return StringUtils.join(failureMessages, "\n");
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogAggregationStatus getLogAggregationStatusForAppReport() {
|
||||
return logAggregation
|
||||
.getLogAggregationStatusForAppReport(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2138,4 +1868,13 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||
RMAppState state){
|
||||
/* TODO fail the application on the failed transition */
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getLogAggregationStartTime() {
|
||||
return logAggregation.getLogAggregationStartTime();
|
||||
}
|
||||
|
||||
Clock getSystemClock() {
|
||||
return systemClock;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,383 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
/**
|
||||
* Log aggregation logic used by RMApp.
|
||||
*
|
||||
*/
|
||||
public class RMAppLogAggregation {
|
||||
private final boolean logAggregationEnabled;
|
||||
private final ReadLock readLock;
|
||||
private final WriteLock writeLock;
|
||||
private long logAggregationStartTime = 0;
|
||||
private final long logAggregationStatusTimeout;
|
||||
private final Map<NodeId, LogAggregationReport> logAggregationStatus =
|
||||
new ConcurrentHashMap<>();
|
||||
private volatile LogAggregationStatus logAggregationStatusForAppReport;
|
||||
private int logAggregationSucceed = 0;
|
||||
private int logAggregationFailed = 0;
|
||||
private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
|
||||
new HashMap<>();
|
||||
private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
|
||||
new HashMap<>();
|
||||
private final int maxLogAggregationDiagnosticsInMemory;
|
||||
|
||||
RMAppLogAggregation(Configuration conf, ReadLock readLock,
|
||||
WriteLock writeLock) {
|
||||
this.readLock = readLock;
|
||||
this.writeLock = writeLock;
|
||||
this.logAggregationStatusTimeout = getLogAggregationStatusTimeout(conf);
|
||||
this.logAggregationEnabled = getEnabledFlagFromConf(conf);
|
||||
this.logAggregationStatusForAppReport =
|
||||
this.logAggregationEnabled ? LogAggregationStatus.NOT_START :
|
||||
LogAggregationStatus.DISABLED;
|
||||
this.maxLogAggregationDiagnosticsInMemory =
|
||||
getMaxLogAggregationDiagnostics(conf);
|
||||
}
|
||||
|
||||
private long getLogAggregationStatusTimeout(Configuration conf) {
|
||||
long statusTimeout =
|
||||
conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
||||
if (statusTimeout <= 0) {
|
||||
return YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
|
||||
} else {
|
||||
return statusTimeout;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean getEnabledFlagFromConf(Configuration conf) {
|
||||
return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
|
||||
}
|
||||
|
||||
private int getMaxLogAggregationDiagnostics(Configuration conf) {
|
||||
return conf.getInt(
|
||||
YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
|
||||
YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
|
||||
}
|
||||
|
||||
Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp(
|
||||
RMAppImpl rmApp) {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
if (!isLogAggregationFinished() && RMAppImpl.isAppInFinalState(rmApp) &&
|
||||
rmApp.getSystemClock().getTime() > this.logAggregationStartTime
|
||||
+ this.logAggregationStatusTimeout) {
|
||||
for (Map.Entry<NodeId, LogAggregationReport> output :
|
||||
logAggregationStatus.entrySet()) {
|
||||
if (!output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.TIME_OUT)
|
||||
&& !output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.SUCCEEDED)
|
||||
&& !output.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.FAILED)) {
|
||||
output.getValue().setLogAggregationStatus(
|
||||
LogAggregationStatus.TIME_OUT);
|
||||
}
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableMap(logAggregationStatus);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void aggregateLogReport(NodeId nodeId, LogAggregationReport report,
|
||||
RMAppImpl rmApp) {
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
if (this.logAggregationEnabled && !isLogAggregationFinished()) {
|
||||
LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
|
||||
boolean stateChangedToFinal = false;
|
||||
if (curReport == null) {
|
||||
this.logAggregationStatus.put(nodeId, report);
|
||||
if (isLogAggregationFinishedForNM(report)) {
|
||||
stateChangedToFinal = true;
|
||||
}
|
||||
} else {
|
||||
if (isLogAggregationFinishedForNM(report)) {
|
||||
if (!isLogAggregationFinishedForNM(curReport)) {
|
||||
stateChangedToFinal = true;
|
||||
}
|
||||
}
|
||||
if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
|
||||
|| curReport.getLogAggregationStatus() !=
|
||||
LogAggregationStatus.RUNNING_WITH_FAILURE) {
|
||||
if (curReport.getLogAggregationStatus()
|
||||
== LogAggregationStatus.TIME_OUT
|
||||
&& report.getLogAggregationStatus()
|
||||
== LogAggregationStatus.RUNNING) {
|
||||
// If the log aggregation status got from latest NM heartbeat
|
||||
// is RUNNING, and current log aggregation status is TIME_OUT,
|
||||
// based on whether there are any failure messages for this NM,
|
||||
// we will reset the log aggregation status as RUNNING or
|
||||
// RUNNING_WITH_FAILURE
|
||||
if (isThereFailureMessageForNM(nodeId)) {
|
||||
report.setLogAggregationStatus(
|
||||
LogAggregationStatus.RUNNING_WITH_FAILURE);
|
||||
}
|
||||
}
|
||||
curReport.setLogAggregationStatus(report
|
||||
.getLogAggregationStatus());
|
||||
}
|
||||
}
|
||||
updateLogAggregationDiagnosticMessages(nodeId, report);
|
||||
if (RMAppImpl.isAppInFinalState(rmApp) && stateChangedToFinal) {
|
||||
updateLogAggregationStatus(nodeId);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public LogAggregationStatus getLogAggregationStatusForAppReport(
|
||||
RMAppImpl rmApp) {
|
||||
boolean appInFinalState = RMAppImpl.isAppInFinalState(rmApp);
|
||||
this.readLock.lock();
|
||||
try {
|
||||
if (!logAggregationEnabled) {
|
||||
return LogAggregationStatus.DISABLED;
|
||||
}
|
||||
if (isLogAggregationFinished()) {
|
||||
return this.logAggregationStatusForAppReport;
|
||||
}
|
||||
Map<NodeId, LogAggregationReport> reports =
|
||||
getLogAggregationReportsForApp(rmApp);
|
||||
if (reports.size() == 0) {
|
||||
return this.logAggregationStatusForAppReport;
|
||||
}
|
||||
int logNotStartCount = 0;
|
||||
int logCompletedCount = 0;
|
||||
int logTimeOutCount = 0;
|
||||
int logFailedCount = 0;
|
||||
int logRunningWithFailure = 0;
|
||||
for (Map.Entry<NodeId, LogAggregationReport> report :
|
||||
reports.entrySet()) {
|
||||
switch (report.getValue().getLogAggregationStatus()) {
|
||||
case NOT_START:
|
||||
logNotStartCount++;
|
||||
break;
|
||||
case RUNNING_WITH_FAILURE:
|
||||
logRunningWithFailure ++;
|
||||
break;
|
||||
case SUCCEEDED:
|
||||
logCompletedCount++;
|
||||
break;
|
||||
case FAILED:
|
||||
logFailedCount++;
|
||||
logCompletedCount++;
|
||||
break;
|
||||
case TIME_OUT:
|
||||
logTimeOutCount++;
|
||||
logCompletedCount++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (logNotStartCount == reports.size()) {
|
||||
return LogAggregationStatus.NOT_START;
|
||||
} else if (logCompletedCount == reports.size()) {
|
||||
// We should satisfy two condition in order to return
|
||||
// SUCCEEDED or FAILED.
|
||||
// 1) make sure the application is in final state
|
||||
// 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
|
||||
// The SUCCEEDED/FAILED status is the final status which means
|
||||
// the log aggregation is finished. And the log aggregation status will
|
||||
// not be updated anymore.
|
||||
if (logFailedCount > 0 && appInFinalState) {
|
||||
this.logAggregationStatusForAppReport =
|
||||
LogAggregationStatus.FAILED;
|
||||
return LogAggregationStatus.FAILED;
|
||||
} else if (logTimeOutCount > 0) {
|
||||
this.logAggregationStatusForAppReport =
|
||||
LogAggregationStatus.TIME_OUT;
|
||||
return LogAggregationStatus.TIME_OUT;
|
||||
}
|
||||
if (appInFinalState) {
|
||||
this.logAggregationStatusForAppReport =
|
||||
LogAggregationStatus.SUCCEEDED;
|
||||
return LogAggregationStatus.SUCCEEDED;
|
||||
}
|
||||
} else if (logRunningWithFailure > 0) {
|
||||
return LogAggregationStatus.RUNNING_WITH_FAILURE;
|
||||
}
|
||||
return LogAggregationStatus.RUNNING;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isLogAggregationFinished() {
|
||||
return this.logAggregationStatusForAppReport
|
||||
.equals(LogAggregationStatus.SUCCEEDED)
|
||||
|| this.logAggregationStatusForAppReport
|
||||
.equals(LogAggregationStatus.FAILED)
|
||||
|| this.logAggregationStatusForAppReport
|
||||
.equals(LogAggregationStatus.TIME_OUT);
|
||||
|
||||
}
|
||||
|
||||
private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
|
||||
return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
|
||||
|| report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
|
||||
}
|
||||
|
||||
private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
|
||||
LogAggregationReport report) {
|
||||
if (report.getDiagnosticMessage() != null
|
||||
&& !report.getDiagnosticMessage().isEmpty()) {
|
||||
if (report.getLogAggregationStatus()
|
||||
== LogAggregationStatus.RUNNING ) {
|
||||
List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
|
||||
if (diagnostics == null) {
|
||||
diagnostics = new ArrayList<>();
|
||||
logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
|
||||
} else {
|
||||
if (diagnostics.size()
|
||||
== maxLogAggregationDiagnosticsInMemory) {
|
||||
diagnostics.remove(0);
|
||||
}
|
||||
}
|
||||
diagnostics.add(report.getDiagnosticMessage());
|
||||
this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
|
||||
StringUtils.join(diagnostics, "\n"));
|
||||
} else if (report.getLogAggregationStatus()
|
||||
== LogAggregationStatus.RUNNING_WITH_FAILURE) {
|
||||
List<String> failureMessages =
|
||||
logAggregationFailureMessagesForNMs.get(nodeId);
|
||||
if (failureMessages == null) {
|
||||
failureMessages = new ArrayList<>();
|
||||
logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
|
||||
} else {
|
||||
if (failureMessages.size()
|
||||
== maxLogAggregationDiagnosticsInMemory) {
|
||||
failureMessages.remove(0);
|
||||
}
|
||||
}
|
||||
failureMessages.add(report.getDiagnosticMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void updateLogAggregationStatus(NodeId nodeId) {
|
||||
LogAggregationStatus status =
|
||||
this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
|
||||
if (status.equals(LogAggregationStatus.SUCCEEDED)) {
|
||||
this.logAggregationSucceed++;
|
||||
} else if (status.equals(LogAggregationStatus.FAILED)) {
|
||||
this.logAggregationFailed++;
|
||||
}
|
||||
if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
|
||||
this.logAggregationStatusForAppReport =
|
||||
LogAggregationStatus.SUCCEEDED;
|
||||
// Since the log aggregation status for this application for all NMs
|
||||
// is SUCCEEDED, it means all logs are aggregated successfully.
|
||||
// We could remove all the cached log aggregation reports
|
||||
this.logAggregationStatus.clear();
|
||||
this.logAggregationDiagnosticsForNMs.clear();
|
||||
this.logAggregationFailureMessagesForNMs.clear();
|
||||
} else if (this.logAggregationSucceed + this.logAggregationFailed
|
||||
== this.logAggregationStatus.size()) {
|
||||
this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
|
||||
// We have collected the log aggregation status for all NMs.
|
||||
// The log aggregation status is FAILED which means the log
|
||||
// aggregation fails in some NMs. We are only interested in the
|
||||
// nodes where the log aggregation is failed. So we could remove
|
||||
// the log aggregation details for those succeeded NMs
|
||||
this.logAggregationStatus.entrySet().removeIf(entry ->
|
||||
entry.getValue().getLogAggregationStatus()
|
||||
.equals(LogAggregationStatus.SUCCEEDED));
|
||||
// the log aggregation has finished/failed.
|
||||
// and the status will not be updated anymore.
|
||||
this.logAggregationDiagnosticsForNMs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
List<String> failureMessages =
|
||||
this.logAggregationFailureMessagesForNMs.get(nodeId);
|
||||
if (failureMessages == null || failureMessages.isEmpty()) {
|
||||
return StringUtils.EMPTY;
|
||||
}
|
||||
return StringUtils.join(failureMessages, "\n");
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void recordLogAggregationStartTime(long time) {
|
||||
logAggregationStartTime = time;
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return logAggregationEnabled;
|
||||
}
|
||||
|
||||
private boolean hasReportForNodeManager(NodeId nodeId) {
|
||||
return logAggregationStatus.containsKey(nodeId);
|
||||
}
|
||||
|
||||
private void addReportForNodeManager(NodeId nodeId,
|
||||
LogAggregationReport report) {
|
||||
logAggregationStatus.put(nodeId, report);
|
||||
}
|
||||
|
||||
public boolean isFinished() {
|
||||
return isLogAggregationFinished();
|
||||
}
|
||||
|
||||
private boolean isThereFailureMessageForNM(NodeId nodeId) {
|
||||
return logAggregationFailureMessagesForNMs.get(nodeId) != null
|
||||
&& !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty();
|
||||
}
|
||||
|
||||
long getLogAggregationStartTime() {
|
||||
return logAggregationStartTime;
|
||||
}
|
||||
|
||||
void addReportIfNecessary(NodeId nodeId, ApplicationId applicationId) {
|
||||
if (!hasReportForNodeManager(nodeId)) {
|
||||
LogAggregationStatus status = isEnabled() ? LogAggregationStatus.NOT_START
|
||||
: LogAggregationStatus.DISABLED;
|
||||
addReportForNodeManager(nodeId,
|
||||
LogAggregationReport.newInstance(applicationId, status, ""));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue