diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2d22bb9c7d1..9d3225745c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -19,18 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.net.InetAddress; -import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -181,19 +177,7 @@ public class RMAppImpl implements RMApp, Recoverable { new AppFinishedTransition(); private Set ranNodes = new ConcurrentSkipListSet(); - private final boolean logAggregationEnabled; - private long logAggregationStartTime = 0; - private final long logAggregationStatusTimeout; - private final Map logAggregationStatus = - new ConcurrentHashMap(); - private volatile LogAggregationStatus logAggregationStatusForAppReport; - private int logAggregationSucceed = 0; - private int logAggregationFailed = 0; - private Map> logAggregationDiagnosticsForNMs = - new HashMap>(); - private Map> logAggregationFailureMessagesForNMs = - new HashMap>(); - private final int maxLogAggregationDiagnosticsInMemory; + private final RMAppLogAggregation logAggregation; private Map applicationTimeouts = new HashMap(); @@ -510,26 +494,7 @@ public class RMAppImpl implements RMApp, Recoverable { applicationSchedulingEnvs .putAll(submissionContext.getApplicationSchedulingPropertiesMap()); - long localLogAggregationStatusTimeout = - conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); - if (localLogAggregationStatusTimeout <= 0) { - this.logAggregationStatusTimeout = - YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS; - } else { - this.logAggregationStatusTimeout = localLogAggregationStatusTimeout; - } - this.logAggregationEnabled = - conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); - if (this.logAggregationEnabled) { - this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START; - } else { - this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED; - } - maxLogAggregationDiagnosticsInMemory = conf.getInt( - YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY, - YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY); + this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock); // amBlacklistingEnabled can be configured globally // Just use the global values @@ -1087,13 +1052,9 @@ public class RMAppImpl implements RMApp, Recoverable { // otherwise, add it to ranNodes for further process app.ranNodes.add(nodeAddedEvent.getNodeId()); - if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) { - app.logAggregationStatus.put(nodeAddedEvent.getNodeId(), - LogAggregationReport.newInstance(app.applicationId, - app.logAggregationEnabled ? LogAggregationStatus.NOT_START - : LogAggregationStatus.DISABLED, "")); - } - }; + app.logAggregation.addReportIfNecessary( + nodeAddedEvent.getNodeId(), app.getApplicationId()); + } } // synchronously recover attempt to ensure any incoming external events @@ -1507,7 +1468,8 @@ public class RMAppImpl implements RMApp, Recoverable { } public void transition(RMAppImpl app, RMAppEvent event) { - app.logAggregationStartTime = app.systemClock.getTime(); + app.logAggregation + .recordLogAggregationStartTime(app.systemClock.getTime()); for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); @@ -1765,263 +1727,31 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public Map getLogAggregationReportsForApp() { - try { - this.readLock.lock(); - if (!isLogAggregationFinished() && isAppInFinalState(this) && - systemClock.getTime() > this.logAggregationStartTime - + this.logAggregationStatusTimeout) { - for (Entry output : - logAggregationStatus.entrySet()) { - if (!output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.TIME_OUT) - && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.SUCCEEDED) - && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED)) { - output.getValue().setLogAggregationStatus( - LogAggregationStatus.TIME_OUT); - } - } - } - return Collections.unmodifiableMap(logAggregationStatus); - } finally { - this.readLock.unlock(); - } + return logAggregation.getLogAggregationReportsForApp(this); } public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { - try { - this.writeLock.lock(); - if (this.logAggregationEnabled && !isLogAggregationFinished()) { - LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); - boolean stateChangedToFinal = false; - if (curReport == null) { - this.logAggregationStatus.put(nodeId, report); - if (isLogAggregationFinishedForNM(report)) { - stateChangedToFinal = true; - } - } else { - if (isLogAggregationFinishedForNM(report)) { - if (!isLogAggregationFinishedForNM(curReport)) { - stateChangedToFinal = true; - } - } - if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING - || curReport.getLogAggregationStatus() != - LogAggregationStatus.RUNNING_WITH_FAILURE) { - if (curReport.getLogAggregationStatus() - == LogAggregationStatus.TIME_OUT - && report.getLogAggregationStatus() - == LogAggregationStatus.RUNNING) { - // If the log aggregation status got from latest nm heartbeat - // is Running, and current log aggregation status is TimeOut, - // based on whether there are any failure messages for this NM, - // we will reset the log aggregation status as RUNNING or - // RUNNING_WITH_FAILURE - if (logAggregationFailureMessagesForNMs.get(nodeId) != null && - !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) { - report.setLogAggregationStatus( - LogAggregationStatus.RUNNING_WITH_FAILURE); - } - } - curReport.setLogAggregationStatus(report - .getLogAggregationStatus()); - } - } - updateLogAggregationDiagnosticMessages(nodeId, report); - if (isAppInFinalState(this) && stateChangedToFinal) { - updateLogAggregationStatus(nodeId); - } - } - } finally { - this.writeLock.unlock(); - } - } - - @Override - public LogAggregationStatus getLogAggregationStatusForAppReport() { - try { - this.readLock.lock(); - if (! logAggregationEnabled) { - return LogAggregationStatus.DISABLED; - } - if (isLogAggregationFinished()) { - return this.logAggregationStatusForAppReport; - } - Map reports = - getLogAggregationReportsForApp(); - if (reports.size() == 0) { - return this.logAggregationStatusForAppReport; - } - int logNotStartCount = 0; - int logCompletedCount = 0; - int logTimeOutCount = 0; - int logFailedCount = 0; - int logRunningWithFailure = 0; - for (Entry report : reports.entrySet()) { - switch (report.getValue().getLogAggregationStatus()) { - case NOT_START: - logNotStartCount++; - break; - case RUNNING_WITH_FAILURE: - logRunningWithFailure ++; - break; - case SUCCEEDED: - logCompletedCount++; - break; - case FAILED: - logFailedCount++; - logCompletedCount++; - break; - case TIME_OUT: - logTimeOutCount++; - logCompletedCount++; - break; - default: - break; - } - } - if (logNotStartCount == reports.size()) { - return LogAggregationStatus.NOT_START; - } else if (logCompletedCount == reports.size()) { - // We should satisfy two condition in order to return SUCCEEDED or FAILED - // 1) make sure the application is in final state - // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT - // The SUCCEEDED/FAILED status is the final status which means - // the log aggregation is finished. And the log aggregation status will - // not be updated anymore. - if (logFailedCount > 0 && isAppInFinalState(this)) { - this.logAggregationStatusForAppReport = - LogAggregationStatus.FAILED; - return LogAggregationStatus.FAILED; - } else if (logTimeOutCount > 0) { - this.logAggregationStatusForAppReport = - LogAggregationStatus.TIME_OUT; - return LogAggregationStatus.TIME_OUT; - } - if (isAppInFinalState(this)) { - this.logAggregationStatusForAppReport = - LogAggregationStatus.SUCCEEDED; - return LogAggregationStatus.SUCCEEDED; - } - } else if (logRunningWithFailure > 0) { - return LogAggregationStatus.RUNNING_WITH_FAILURE; - } - return LogAggregationStatus.RUNNING; - } finally { - this.readLock.unlock(); - } - } - - @Override - public boolean isLogAggregationEnabled() { - return logAggregationEnabled; + logAggregation.aggregateLogReport(nodeId, report, this); } @Override public boolean isLogAggregationFinished() { - return this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.SUCCEEDED) - || this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.FAILED) - || this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.TIME_OUT); - + return logAggregation.isFinished(); } - private boolean isLogAggregationFinishedForNM(LogAggregationReport report) { - return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED - || report.getLogAggregationStatus() == LogAggregationStatus.FAILED; - } - - private void updateLogAggregationDiagnosticMessages(NodeId nodeId, - LogAggregationReport report) { - if (report.getDiagnosticMessage() != null - && !report.getDiagnosticMessage().isEmpty()) { - if (report.getLogAggregationStatus() - == LogAggregationStatus.RUNNING ) { - List diagnostics = logAggregationDiagnosticsForNMs.get(nodeId); - if (diagnostics == null) { - diagnostics = new ArrayList(); - logAggregationDiagnosticsForNMs.put(nodeId, diagnostics); - } else { - if (diagnostics.size() - == maxLogAggregationDiagnosticsInMemory) { - diagnostics.remove(0); - } - } - diagnostics.add(report.getDiagnosticMessage()); - this.logAggregationStatus.get(nodeId).setDiagnosticMessage( - StringUtils.join(diagnostics, "\n")); - } else if (report.getLogAggregationStatus() - == LogAggregationStatus.RUNNING_WITH_FAILURE) { - List failureMessages = - logAggregationFailureMessagesForNMs.get(nodeId); - if (failureMessages == null) { - failureMessages = new ArrayList(); - logAggregationFailureMessagesForNMs.put(nodeId, failureMessages); - } else { - if (failureMessages.size() - == maxLogAggregationDiagnosticsInMemory) { - failureMessages.remove(0); - } - } - failureMessages.add(report.getDiagnosticMessage()); - } - } - } - - private void updateLogAggregationStatus(NodeId nodeId) { - LogAggregationStatus status = - this.logAggregationStatus.get(nodeId).getLogAggregationStatus(); - if (status.equals(LogAggregationStatus.SUCCEEDED)) { - this.logAggregationSucceed++; - } else if (status.equals(LogAggregationStatus.FAILED)) { - this.logAggregationFailed++; - } - if (this.logAggregationSucceed == this.logAggregationStatus.size()) { - this.logAggregationStatusForAppReport = - LogAggregationStatus.SUCCEEDED; - // Since the log aggregation status for this application for all NMs - // is SUCCEEDED, it means all logs are aggregated successfully. - // We could remove all the cached log aggregation reports - this.logAggregationStatus.clear(); - this.logAggregationDiagnosticsForNMs.clear(); - this.logAggregationFailureMessagesForNMs.clear(); - } else if (this.logAggregationSucceed + this.logAggregationFailed - == this.logAggregationStatus.size()) { - this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; - // We have collected the log aggregation status for all NMs. - // The log aggregation status is FAILED which means the log - // aggregation fails in some NMs. We are only interested in the - // nodes where the log aggregation is failed. So we could remove - // the log aggregation details for those succeeded NMs - for (Iterator> it = - this.logAggregationStatus.entrySet().iterator(); it.hasNext();) { - Map.Entry entry = it.next(); - if (entry.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.SUCCEEDED)) { - it.remove(); - } - } - // the log aggregation has finished/failed. - // and the status will not be updated anymore. - this.logAggregationDiagnosticsForNMs.clear(); - } + @Override + public boolean isLogAggregationEnabled() { + return logAggregation.isEnabled(); } public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { - try { - this.readLock.lock(); - List failureMessages = - this.logAggregationFailureMessagesForNMs.get(nodeId); - if (failureMessages == null || failureMessages.isEmpty()) { - return StringUtils.EMPTY; - } - return StringUtils.join(failureMessages, "\n"); - } finally { - this.readLock.unlock(); - } + return logAggregation.getLogAggregationFailureMessagesForNM(nodeId); + } + + @Override + public LogAggregationStatus getLogAggregationStatusForAppReport() { + return logAggregation + .getLogAggregationStatusForAppReport(this); } @Override @@ -2138,4 +1868,13 @@ public class RMAppImpl implements RMApp, Recoverable { RMAppState state){ /* TODO fail the application on the failed transition */ } + + @VisibleForTesting + long getLogAggregationStartTime() { + return logAggregation.getLogAggregationStartTime(); + } + + Clock getSystemClock() { + return systemClock; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java new file mode 100644 index 00000000000..b4409ff8801 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregation.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +/** + * Log aggregation logic used by RMApp. + * + */ +public class RMAppLogAggregation { + private final boolean logAggregationEnabled; + private final ReadLock readLock; + private final WriteLock writeLock; + private long logAggregationStartTime = 0; + private final long logAggregationStatusTimeout; + private final Map logAggregationStatus = + new ConcurrentHashMap<>(); + private volatile LogAggregationStatus logAggregationStatusForAppReport; + private int logAggregationSucceed = 0; + private int logAggregationFailed = 0; + private Map> logAggregationDiagnosticsForNMs = + new HashMap<>(); + private Map> logAggregationFailureMessagesForNMs = + new HashMap<>(); + private final int maxLogAggregationDiagnosticsInMemory; + + RMAppLogAggregation(Configuration conf, ReadLock readLock, + WriteLock writeLock) { + this.readLock = readLock; + this.writeLock = writeLock; + this.logAggregationStatusTimeout = getLogAggregationStatusTimeout(conf); + this.logAggregationEnabled = getEnabledFlagFromConf(conf); + this.logAggregationStatusForAppReport = + this.logAggregationEnabled ? LogAggregationStatus.NOT_START : + LogAggregationStatus.DISABLED; + this.maxLogAggregationDiagnosticsInMemory = + getMaxLogAggregationDiagnostics(conf); + } + + private long getLogAggregationStatusTimeout(Configuration conf) { + long statusTimeout = + conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); + if (statusTimeout <= 0) { + return YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS; + } else { + return statusTimeout; + } + } + + private boolean getEnabledFlagFromConf(Configuration conf) { + return conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + } + + private int getMaxLogAggregationDiagnostics(Configuration conf) { + return conf.getInt( + YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY, + YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY); + } + + Map getLogAggregationReportsForApp( + RMAppImpl rmApp) { + this.readLock.lock(); + try { + if (!isLogAggregationFinished() && RMAppImpl.isAppInFinalState(rmApp) && + rmApp.getSystemClock().getTime() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + for (Map.Entry output : + logAggregationStatus.entrySet()) { + if (!output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.TIME_OUT) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.SUCCEEDED) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.FAILED)) { + output.getValue().setLogAggregationStatus( + LogAggregationStatus.TIME_OUT); + } + } + } + return Collections.unmodifiableMap(logAggregationStatus); + } finally { + this.readLock.unlock(); + } + } + + void aggregateLogReport(NodeId nodeId, LogAggregationReport report, + RMAppImpl rmApp) { + this.writeLock.lock(); + try { + if (this.logAggregationEnabled && !isLogAggregationFinished()) { + LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); + boolean stateChangedToFinal = false; + if (curReport == null) { + this.logAggregationStatus.put(nodeId, report); + if (isLogAggregationFinishedForNM(report)) { + stateChangedToFinal = true; + } + } else { + if (isLogAggregationFinishedForNM(report)) { + if (!isLogAggregationFinishedForNM(curReport)) { + stateChangedToFinal = true; + } + } + if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING + || curReport.getLogAggregationStatus() != + LogAggregationStatus.RUNNING_WITH_FAILURE) { + if (curReport.getLogAggregationStatus() + == LogAggregationStatus.TIME_OUT + && report.getLogAggregationStatus() + == LogAggregationStatus.RUNNING) { + // If the log aggregation status got from latest NM heartbeat + // is RUNNING, and current log aggregation status is TIME_OUT, + // based on whether there are any failure messages for this NM, + // we will reset the log aggregation status as RUNNING or + // RUNNING_WITH_FAILURE + if (isThereFailureMessageForNM(nodeId)) { + report.setLogAggregationStatus( + LogAggregationStatus.RUNNING_WITH_FAILURE); + } + } + curReport.setLogAggregationStatus(report + .getLogAggregationStatus()); + } + } + updateLogAggregationDiagnosticMessages(nodeId, report); + if (RMAppImpl.isAppInFinalState(rmApp) && stateChangedToFinal) { + updateLogAggregationStatus(nodeId); + } + } + } finally { + this.writeLock.unlock(); + } + } + + public LogAggregationStatus getLogAggregationStatusForAppReport( + RMAppImpl rmApp) { + boolean appInFinalState = RMAppImpl.isAppInFinalState(rmApp); + this.readLock.lock(); + try { + if (!logAggregationEnabled) { + return LogAggregationStatus.DISABLED; + } + if (isLogAggregationFinished()) { + return this.logAggregationStatusForAppReport; + } + Map reports = + getLogAggregationReportsForApp(rmApp); + if (reports.size() == 0) { + return this.logAggregationStatusForAppReport; + } + int logNotStartCount = 0; + int logCompletedCount = 0; + int logTimeOutCount = 0; + int logFailedCount = 0; + int logRunningWithFailure = 0; + for (Map.Entry report : + reports.entrySet()) { + switch (report.getValue().getLogAggregationStatus()) { + case NOT_START: + logNotStartCount++; + break; + case RUNNING_WITH_FAILURE: + logRunningWithFailure ++; + break; + case SUCCEEDED: + logCompletedCount++; + break; + case FAILED: + logFailedCount++; + logCompletedCount++; + break; + case TIME_OUT: + logTimeOutCount++; + logCompletedCount++; + break; + default: + break; + } + } + if (logNotStartCount == reports.size()) { + return LogAggregationStatus.NOT_START; + } else if (logCompletedCount == reports.size()) { + // We should satisfy two condition in order to return + // SUCCEEDED or FAILED. + // 1) make sure the application is in final state + // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT + // The SUCCEEDED/FAILED status is the final status which means + // the log aggregation is finished. And the log aggregation status will + // not be updated anymore. + if (logFailedCount > 0 && appInFinalState) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.FAILED; + return LogAggregationStatus.FAILED; + } else if (logTimeOutCount > 0) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.TIME_OUT; + return LogAggregationStatus.TIME_OUT; + } + if (appInFinalState) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.SUCCEEDED; + return LogAggregationStatus.SUCCEEDED; + } + } else if (logRunningWithFailure > 0) { + return LogAggregationStatus.RUNNING_WITH_FAILURE; + } + return LogAggregationStatus.RUNNING; + } finally { + this.readLock.unlock(); + } + } + + private boolean isLogAggregationFinished() { + return this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.SUCCEEDED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.FAILED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.TIME_OUT); + + } + + private boolean isLogAggregationFinishedForNM(LogAggregationReport report) { + return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED + || report.getLogAggregationStatus() == LogAggregationStatus.FAILED; + } + + private void updateLogAggregationDiagnosticMessages(NodeId nodeId, + LogAggregationReport report) { + if (report.getDiagnosticMessage() != null + && !report.getDiagnosticMessage().isEmpty()) { + if (report.getLogAggregationStatus() + == LogAggregationStatus.RUNNING ) { + List diagnostics = logAggregationDiagnosticsForNMs.get(nodeId); + if (diagnostics == null) { + diagnostics = new ArrayList<>(); + logAggregationDiagnosticsForNMs.put(nodeId, diagnostics); + } else { + if (diagnostics.size() + == maxLogAggregationDiagnosticsInMemory) { + diagnostics.remove(0); + } + } + diagnostics.add(report.getDiagnosticMessage()); + this.logAggregationStatus.get(nodeId).setDiagnosticMessage( + StringUtils.join(diagnostics, "\n")); + } else if (report.getLogAggregationStatus() + == LogAggregationStatus.RUNNING_WITH_FAILURE) { + List failureMessages = + logAggregationFailureMessagesForNMs.get(nodeId); + if (failureMessages == null) { + failureMessages = new ArrayList<>(); + logAggregationFailureMessagesForNMs.put(nodeId, failureMessages); + } else { + if (failureMessages.size() + == maxLogAggregationDiagnosticsInMemory) { + failureMessages.remove(0); + } + } + failureMessages.add(report.getDiagnosticMessage()); + } + } + } + + private void updateLogAggregationStatus(NodeId nodeId) { + LogAggregationStatus status = + this.logAggregationStatus.get(nodeId).getLogAggregationStatus(); + if (status.equals(LogAggregationStatus.SUCCEEDED)) { + this.logAggregationSucceed++; + } else if (status.equals(LogAggregationStatus.FAILED)) { + this.logAggregationFailed++; + } + if (this.logAggregationSucceed == this.logAggregationStatus.size()) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.SUCCEEDED; + // Since the log aggregation status for this application for all NMs + // is SUCCEEDED, it means all logs are aggregated successfully. + // We could remove all the cached log aggregation reports + this.logAggregationStatus.clear(); + this.logAggregationDiagnosticsForNMs.clear(); + this.logAggregationFailureMessagesForNMs.clear(); + } else if (this.logAggregationSucceed + this.logAggregationFailed + == this.logAggregationStatus.size()) { + this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; + // We have collected the log aggregation status for all NMs. + // The log aggregation status is FAILED which means the log + // aggregation fails in some NMs. We are only interested in the + // nodes where the log aggregation is failed. So we could remove + // the log aggregation details for those succeeded NMs + this.logAggregationStatus.entrySet().removeIf(entry -> + entry.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.SUCCEEDED)); + // the log aggregation has finished/failed. + // and the status will not be updated anymore. + this.logAggregationDiagnosticsForNMs.clear(); + } + } + + String getLogAggregationFailureMessagesForNM(NodeId nodeId) { + this.readLock.lock(); + try { + List failureMessages = + this.logAggregationFailureMessagesForNMs.get(nodeId); + if (failureMessages == null || failureMessages.isEmpty()) { + return StringUtils.EMPTY; + } + return StringUtils.join(failureMessages, "\n"); + } finally { + this.readLock.unlock(); + } + } + + void recordLogAggregationStartTime(long time) { + logAggregationStartTime = time; + } + + public boolean isEnabled() { + return logAggregationEnabled; + } + + private boolean hasReportForNodeManager(NodeId nodeId) { + return logAggregationStatus.containsKey(nodeId); + } + + private void addReportForNodeManager(NodeId nodeId, + LogAggregationReport report) { + logAggregationStatus.put(nodeId, report); + } + + public boolean isFinished() { + return isLogAggregationFinished(); + } + + private boolean isThereFailureMessageForNM(NodeId nodeId) { + return logAggregationFailureMessagesForNMs.get(nodeId) != null + && !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty(); + } + + long getLogAggregationStartTime() { + return logAggregationStartTime; + } + + void addReportIfNecessary(NodeId nodeId, ApplicationId applicationId) { + if (!hasReportForNodeManager(nodeId)) { + LogAggregationStatus status = isEnabled() ? LogAggregationStatus.NOT_START + : LogAggregationStatus.DISABLED; + addReportForNodeManager(nodeId, + LogAggregationReport.newInstance(applicationId, status, "")); + } + } +}