YARN-6339. Improve performance for createAndGetApplicationReport. (Yunjiong Zhao via wangda)

(cherry picked from commit cd014d57aa)
This commit is contained in:
Wangda Tan 2017-03-27 13:29:09 -07:00
parent ec5a65a0ca
commit b218676b15
3 changed files with 25 additions and 15 deletions

View File

@ -271,6 +271,8 @@ public static ReservationRequestInterpreter convertFromProtoFormat(
* Log Aggregation Status * Log Aggregation Status
*/ */
private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_"; private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_";
private static final int LOG_AGGREGATION_STATUS_PREFIX_LEN =
LOG_AGGREGATION_STATUS_PREFIX.length();
public static LogAggregationStatusProto convertToProtoFormat( public static LogAggregationStatusProto convertToProtoFormat(
LogAggregationStatus e) { LogAggregationStatus e) {
return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX
@ -279,8 +281,8 @@ public static LogAggregationStatusProto convertToProtoFormat(
public static LogAggregationStatus convertFromProtoFormat( public static LogAggregationStatus convertFromProtoFormat(
LogAggregationStatusProto e) { LogAggregationStatusProto e) {
return LogAggregationStatus.valueOf(e.name().replace( return LogAggregationStatus.valueOf(e.name().substring(
LOG_AGGREGATION_STATUS_PREFIX, "")); LOG_AGGREGATION_STATUS_PREFIX_LEN));
} }
/* /*

View File

@ -35,6 +35,7 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@ -164,8 +165,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private long logAggregationStartTime = 0; private long logAggregationStartTime = 0;
private final long logAggregationStatusTimeout; private final long logAggregationStatusTimeout;
private final Map<NodeId, LogAggregationReport> logAggregationStatus = private final Map<NodeId, LogAggregationReport> logAggregationStatus =
new HashMap<NodeId, LogAggregationReport>(); new ConcurrentHashMap<NodeId, LogAggregationReport>();
private LogAggregationStatus logAggregationStatusForAppReport; private volatile LogAggregationStatus logAggregationStatusForAppReport;
private int logAggregationSucceed = 0; private int logAggregationSucceed = 0;
private int logAggregationFailed = 0; private int logAggregationFailed = 0;
private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs = private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
@ -1515,26 +1516,23 @@ protected Credentials parseCredentials() throws IOException {
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() { public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
try { try {
this.readLock.lock(); this.readLock.lock();
Map<NodeId, LogAggregationReport> outputs = if (!isLogAggregationFinished() && isAppInFinalState(this) &&
new HashMap<NodeId, LogAggregationReport>(); System.currentTimeMillis() > this.logAggregationStartTime
outputs.putAll(logAggregationStatus); + this.logAggregationStatusTimeout) {
if (!isLogAggregationFinished()) { for (Entry<NodeId, LogAggregationReport> output :
for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) { logAggregationStatus.entrySet()) {
if (!output.getValue().getLogAggregationStatus() if (!output.getValue().getLogAggregationStatus()
.equals(LogAggregationStatus.TIME_OUT) .equals(LogAggregationStatus.TIME_OUT)
&& !output.getValue().getLogAggregationStatus() && !output.getValue().getLogAggregationStatus()
.equals(LogAggregationStatus.SUCCEEDED) .equals(LogAggregationStatus.SUCCEEDED)
&& !output.getValue().getLogAggregationStatus() && !output.getValue().getLogAggregationStatus()
.equals(LogAggregationStatus.FAILED) .equals(LogAggregationStatus.FAILED)) {
&& isAppInFinalState(this)
&& System.currentTimeMillis() > this.logAggregationStartTime
+ this.logAggregationStatusTimeout) {
output.getValue().setLogAggregationStatus( output.getValue().setLogAggregationStatus(
LogAggregationStatus.TIME_OUT); LogAggregationStatus.TIME_OUT);
} }
} }
} }
return outputs; return Collections.unmodifiableMap(logAggregationStatus);
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }
@ -1642,11 +1640,17 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() {
// the log aggregation is finished. And the log aggregation status will // the log aggregation is finished. And the log aggregation status will
// not be updated anymore. // not be updated anymore.
if (logFailedCount > 0 && isAppInFinalState(this)) { if (logFailedCount > 0 && isAppInFinalState(this)) {
this.logAggregationStatusForAppReport =
LogAggregationStatus.FAILED;
return LogAggregationStatus.FAILED; return LogAggregationStatus.FAILED;
} else if (logTimeOutCount > 0) { } else if (logTimeOutCount > 0) {
this.logAggregationStatusForAppReport =
LogAggregationStatus.TIME_OUT;
return LogAggregationStatus.TIME_OUT; return LogAggregationStatus.TIME_OUT;
} }
if (isAppInFinalState(this)) { if (isAppInFinalState(this)) {
this.logAggregationStatusForAppReport =
LogAggregationStatus.SUCCEEDED;
return LogAggregationStatus.SUCCEEDED; return LogAggregationStatus.SUCCEEDED;
} }
} else if (logRunningWithFailure > 0) { } else if (logRunningWithFailure > 0) {
@ -1662,7 +1666,9 @@ private boolean isLogAggregationFinished() {
return this.logAggregationStatusForAppReport return this.logAggregationStatusForAppReport
.equals(LogAggregationStatus.SUCCEEDED) .equals(LogAggregationStatus.SUCCEEDED)
|| this.logAggregationStatusForAppReport || this.logAggregationStatusForAppReport
.equals(LogAggregationStatus.FAILED); .equals(LogAggregationStatus.FAILED)
|| this.logAggregationStatusForAppReport
.equals(LogAggregationStatus.TIME_OUT);
} }

View File

@ -408,6 +408,8 @@ public void testGetLogAggregationStatusForAppReport() {
Assert.assertEquals(LogAggregationStatus.TIME_OUT, Assert.assertEquals(LogAggregationStatus.TIME_OUT,
rmApp.getLogAggregationStatusForAppReport()); rmApp.getLogAggregationStatusForAppReport());
rmApp = (RMAppImpl)createRMApp(conf);
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
// If the log aggregation status for all NMs are SUCCEEDED and Application // If the log aggregation status for all NMs are SUCCEEDED and Application
// is at the final state, the log aggregation status for this app will // is at the final state, the log aggregation status for this app will
// return SUCCEEDED // return SUCCEEDED