diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 590c08828f8..8f164d839e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -271,6 +271,8 @@ public class ProtoUtils { * Log Aggregation Status */ private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_"; + private static final int LOG_AGGREGATION_STATUS_PREFIX_LEN = + LOG_AGGREGATION_STATUS_PREFIX.length(); public static LogAggregationStatusProto convertToProtoFormat( LogAggregationStatus e) { return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX @@ -279,8 +281,8 @@ public class ProtoUtils { public static LogAggregationStatus convertFromProtoFormat( LogAggregationStatusProto e) { - return LogAggregationStatus.valueOf(e.name().replace( - LOG_AGGREGATION_STATUS_PREFIX, "")); + return LogAggregationStatus.valueOf(e.name().substring( + LOG_AGGREGATION_STATUS_PREFIX_LEN)); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 879158dbdc3..a4feb5f1c5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -164,8 +165,8 @@ public class RMAppImpl implements RMApp, Recoverable { private long logAggregationStartTime = 0; private final long logAggregationStatusTimeout; private final Map logAggregationStatus = - new HashMap(); - private LogAggregationStatus logAggregationStatusForAppReport; + new ConcurrentHashMap(); + private volatile LogAggregationStatus logAggregationStatusForAppReport; private int logAggregationSucceed = 0; private int logAggregationFailed = 0; private Map> logAggregationDiagnosticsForNMs = @@ -1515,26 +1516,23 @@ public class RMAppImpl implements RMApp, Recoverable { public Map getLogAggregationReportsForApp() { try { this.readLock.lock(); - Map outputs = - new HashMap(); - outputs.putAll(logAggregationStatus); - if (!isLogAggregationFinished()) { - for (Entry output : outputs.entrySet()) { + if (!isLogAggregationFinished() && isAppInFinalState(this) && + System.currentTimeMillis() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + for (Entry output : + logAggregationStatus.entrySet()) { if (!output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.TIME_OUT) && !output.getValue().getLogAggregationStatus() .equals(LogAggregationStatus.SUCCEEDED) && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED) - && isAppInFinalState(this) - && System.currentTimeMillis() > this.logAggregationStartTime - + this.logAggregationStatusTimeout) { + .equals(LogAggregationStatus.FAILED)) { output.getValue().setLogAggregationStatus( LogAggregationStatus.TIME_OUT); } } } - return outputs; + return Collections.unmodifiableMap(logAggregationStatus); } finally { this.readLock.unlock(); } @@ -1642,11 +1640,17 @@ public class RMAppImpl implements RMApp, Recoverable { // the log aggregation is finished. And the log aggregation status will // not be updated anymore. if (logFailedCount > 0 && isAppInFinalState(this)) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.FAILED; return LogAggregationStatus.FAILED; } else if (logTimeOutCount > 0) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.TIME_OUT; return LogAggregationStatus.TIME_OUT; } if (isAppInFinalState(this)) { + this.logAggregationStatusForAppReport = + LogAggregationStatus.SUCCEEDED; return LogAggregationStatus.SUCCEEDED; } } else if (logRunningWithFailure > 0) { @@ -1662,7 +1666,9 @@ public class RMAppImpl implements RMApp, Recoverable { return this.logAggregationStatusForAppReport .equals(LogAggregationStatus.SUCCEEDED) || this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.FAILED); + .equals(LogAggregationStatus.FAILED) + || this.logAggregationStatusForAppReport + .equals(LogAggregationStatus.TIME_OUT); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 087199da192..0c1777b7127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -408,6 +408,8 @@ public class TestRMAppLogAggregationStatus { Assert.assertEquals(LogAggregationStatus.TIME_OUT, rmApp.getLogAggregationStatusForAppReport()); + rmApp = (RMAppImpl)createRMApp(conf); + rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL)); // If the log aggregation status for all NMs are SUCCEEDED and Application // is at the final state, the log aggregation status for this app will // return SUCCEEDED