YARN-1402. Update related Web UI and CLI with exposing client API to check log aggregation status. Contributed by Xuan Gong.

(cherry picked from commit 1db355a875)
This commit is contained in:
Junping Du 2015-04-17 13:18:59 -07:00
parent dbe1cec0da
commit b244701fad
20 changed files with 346 additions and 49 deletions

View File

@ -36,6 +36,9 @@ Release 2.8.0 - UNRELEASED
YARN-3354. Add node label expression in ContainerTokenIdentifier to support
RM recovery. (Wangda Tan via jianhe)
YARN-1402. Update related Web UI and CLI with exposing client API to check
log aggregation status. (Xuan Gong via junping_du)
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -361,5 +361,17 @@ public abstract class ApplicationReport {
@Public
@Stable
public abstract Token getAMRMToken();
/**
* Get log aggregation status for the application
* @return Application's log aggregation status
*/
@Public
@Stable
public abstract LogAggregationStatus getLogAggregationStatus();
@Private
@Unstable
public abstract void setLogAggregationStatus(
LogAggregationStatus logAggregationStatus);
}

View File

@ -16,16 +16,40 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* <p>Status of Log aggregation.</p>
*/
public enum LogAggregationStatus {
/** Log Aggregation is Disabled. */
DISABLED,
/** Log Aggregation does not Start. */
NOT_START,
/** Log Aggregation is Running. */
RUNNING,
FINISHED,
/**
* Log Aggregation is Succeeded. All of the logs have been aggregated
* successfully.
*/
SUCCEEDED,
/**
* Log Aggregation is completed. But at least one of the logs have not been
* aggregated.
*/
FAILED,
/**
* The application is finished, but the log aggregation status is not updated
* for a long time.
* @see YarnConfiguration#LOG_AGGREGATION_STATUS_TIME_OUT_MS
*/
TIME_OUT
}

View File

@ -194,6 +194,16 @@ message ApplicationReportProto {
optional string applicationType = 18;
optional hadoop.common.TokenProto am_rm_token = 19;
repeated string applicationTags = 20;
optional LogAggregationStatusProto log_aggregation_status = 21;
}
enum LogAggregationStatusProto {
LOG_DISABLED = 1;
LOG_NOT_START = 2;
LOG_RUNNING = 3;
LOG_SUCCEEDED = 4;
LOG_FAILED = 5;
LOG_TIME_OUT = 6;
}
message ApplicationAttemptReportProto {

View File

@ -530,6 +530,9 @@ public class ApplicationCLI extends YarnCLI {
} else {
appReportStr.println("N/A");
}
appReportStr.print("\tLog Aggregation Status : ");
appReportStr.println(appReport.getLogAggregationStatus() == null ? "N/A"
: appReport.getLogAggregationStatus());
appReportStr.print("\tDiagnostics : ");
appReportStr.print(appReport.getDiagnostics());
} else {

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -104,6 +105,7 @@ public class TestYarnCLI {
YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
FinalApplicationStatus.SUCCEEDED, usageReport, "N/A", 0.53789f, "YARN",
null);
newApplicationReport.setLogAggregationStatus(LogAggregationStatus.SUCCEEDED);
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport);
int result = cli.run(new String[] { "application", "-status", applicationId.toString() });
@ -127,6 +129,7 @@ public class TestYarnCLI {
pw.println("\tAM Host : host");
pw.println("\tAggregate Resource Allocation : " +
(i == 0 ? "N/A" : "123456 MB-seconds, 4567 vcore-seconds"));
pw.println("\tLog Aggregation Status : SUCCEEDED");
pw.println("\tDiagnostics : diagnostics");
pw.close();
String appReportStr = baos.toString("UTF-8");

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import com.google.protobuf.TextFormat;
@ -548,4 +550,35 @@ public class ApplicationReportPBImpl extends ApplicationReport {
private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl)t).getProto();
}
@Override
public LogAggregationStatus getLogAggregationStatus() {
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasLogAggregationStatus()) {
return null;
}
return convertFromProtoFormat(p.getLogAggregationStatus());
}
@Override
public void setLogAggregationStatus(
LogAggregationStatus logAggregationStatus) {
maybeInitBuilder();
if (logAggregationStatus == null) {
builder.clearLogAggregationStatus();
return;
}
builder.setLogAggregationStatus(
convertToProtoFormat(logAggregationStatus));
}
private LogAggregationStatus convertFromProtoFormat(
LogAggregationStatusProto s) {
return ProtoUtils.convertFromProtoFormat(s);
}
private LogAggregationStatusProto
convertToProtoFormat(LogAggregationStatus s) {
return ProtoUtils.convertToProtoFormat(s);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueACLProto;
@ -253,4 +255,19 @@ public class ProtoUtils {
return ReservationRequestInterpreter.valueOf(e.name());
}
/*
* Log Aggregation Status
*/
private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_";
public static LogAggregationStatusProto convertToProtoFormat(
LogAggregationStatus e) {
return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX
+ e.name());
}
public static LogAggregationStatus convertFromProtoFormat(
LogAggregationStatusProto e) {
return LogAggregationStatus.valueOf(e.name().replace(
LOG_AGGREGATION_STATUS_PREFIX, ""));
}
}

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.util.Records;
/**

View File

@ -21,16 +21,17 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.LogAggregationStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
import com.google.protobuf.TextFormat;
@ -43,8 +44,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
LogAggregationReportProto.Builder builder = null;
boolean viaProto = false;
private static final String LOGAGGREGATION_STATUS_PREFIX = "LOG_";
private ApplicationId applicationId;
private NodeId nodeId;
@ -166,14 +165,12 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
private LogAggregationStatus convertFromProtoFormat(
LogAggregationStatusProto s) {
return LogAggregationStatus.valueOf(s.name().replace(
LOGAGGREGATION_STATUS_PREFIX, ""));
return ProtoUtils.convertFromProtoFormat(s);
}
private LogAggregationStatusProto
convertToProtoFormat(LogAggregationStatus s) {
return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX
+ s.name());
return ProtoUtils.convertToProtoFormat(s);
}
@Override

View File

@ -52,13 +52,4 @@ message NodeHealthStatusProto {
message VersionProto {
optional int32 major_version = 1;
optional int32 minor_version = 2;
}
enum LogAggregationStatusProto {
LOG_DISABLED = 1;
LOG_NOT_START = 2;
LOG_RUNNING = 3;
LOG_FINISHED = 4;
LOG_TIME_OUT = 5;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -58,7 +59,6 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@ -347,7 +347,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
report.setDiagnosticMessage(diagnosticMessage);
if (appFinished) {
report.setLogAggregationStatus(renameTemporaryLogFileFailed
? LogAggregationStatus.FAILED : LogAggregationStatus.FINISHED);
? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED);
} else {
report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -245,4 +246,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
ResourceRequest getAMResourceRequest();
Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp();
LogAggregationStatus getLogAggregationStatusForAppReport();
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -64,7 +65,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@ -151,6 +151,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final long logAggregationStatusTimeout;
private final Map<NodeId, LogAggregationReport> logAggregationStatus =
new HashMap<NodeId, LogAggregationReport>();
private LogAggregationStatus logAggregationStatusForAppReport;
// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
@ -578,6 +579,7 @@ public class RMAppImpl implements RMApp, Recoverable {
String trackingUrl = UNAVAILABLE;
String host = UNAVAILABLE;
String origTrackingUrl = UNAVAILABLE;
LogAggregationStatus logAggregationStatus = null;
int rpcPort = -1;
ApplicationResourceUsageReport appUsageReport =
RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
@ -608,6 +610,7 @@ public class RMAppImpl implements RMApp, Recoverable {
rpcPort = this.currentAttempt.getRpcPort();
appUsageReport = currentAttempt.getApplicationResourceUsageReport();
progress = currentAttempt.getProgress();
logAggregationStatus = this.getLogAggregationStatusForAppReport();
}
diags = this.diagnostics.toString();
@ -635,13 +638,15 @@ public class RMAppImpl implements RMApp, Recoverable {
DUMMY_APPLICATION_ATTEMPT_NUMBER);
}
return BuilderUtils.newApplicationReport(this.applicationId,
currentApplicationAttemptId, this.user, this.queue,
this.name, host, rpcPort, clientToAMToken,
ApplicationReport report = BuilderUtils.newApplicationReport(
this.applicationId, currentApplicationAttemptId, this.user,
this.queue, this.name, host, rpcPort, clientToAMToken,
createApplicationState(), diags,
trackingUrl, this.startTime, this.finishTime, finishState,
appUsageReport, origTrackingUrl, progress, this.applicationType,
amrmToken, applicationTags);
report.setLogAggregationStatus(logAggregationStatus);
return report;
} finally {
this.readLock.unlock();
}
@ -827,11 +832,13 @@ public class RMAppImpl implements RMApp, Recoverable {
// otherwise, add it to ranNodes for further process
app.ranNodes.add(nodeAddedEvent.getNodeId());
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
.getNodeId(), app.logAggregationEnabled
? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
""));
if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
.getNodeId(), app.logAggregationEnabled
? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
""));
}
};
}
@ -1398,7 +1405,9 @@ public class RMAppImpl implements RMApp, Recoverable {
if (!output.getValue().getLogAggregationStatus()
.equals(LogAggregationStatus.TIME_OUT)
&& !output.getValue().getLogAggregationStatus()
.equals(LogAggregationStatus.FINISHED)
.equals(LogAggregationStatus.SUCCEEDED)
&& !output.getValue().getLogAggregationStatus()
.equals(LogAggregationStatus.FAILED)
&& isAppInFinalState(this)
&& System.currentTimeMillis() > this.logAggregationStartTime
+ this.logAggregationStatusTimeout) {
@ -1423,7 +1432,9 @@ public class RMAppImpl implements RMApp, Recoverable {
if (curReport.getLogAggregationStatus().equals(
LogAggregationStatus.TIME_OUT)) {
if (report.getLogAggregationStatus().equals(
LogAggregationStatus.FINISHED)) {
LogAggregationStatus.SUCCEEDED)
|| report.getLogAggregationStatus().equals(
LogAggregationStatus.FAILED)) {
curReport.setLogAggregationStatus(report
.getLogAggregationStatus());
}
@ -1444,4 +1455,70 @@ public class RMAppImpl implements RMApp, Recoverable {
this.writeLock.unlock();
}
}
@Override
public LogAggregationStatus getLogAggregationStatusForAppReport() {
if (!logAggregationEnabled) {
return LogAggregationStatus.DISABLED;
}
if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED
|| this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) {
return this.logAggregationStatusForAppReport;
}
try {
this.readLock.lock();
Map<NodeId, LogAggregationReport> reports =
getLogAggregationReportsForApp();
if (reports.size() == 0) {
return null;
}
int logNotStartCount = 0;
int logCompletedCount = 0;
int logTimeOutCount = 0;
int logFailedCount = 0;
for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
switch (report.getValue().getLogAggregationStatus()) {
case NOT_START:
logNotStartCount++;
break;
case SUCCEEDED:
logCompletedCount++;
break;
case FAILED:
logFailedCount++;
logCompletedCount++;
break;
case TIME_OUT:
logTimeOutCount++;
logCompletedCount++;
break;
default:
break;
}
}
if (logNotStartCount == reports.size()) {
return LogAggregationStatus.NOT_START;
} else if (logCompletedCount == reports.size()) {
// We should satisfy two condition in order to return SUCCEEDED or FAILED
// 1) make sure the application is in final state
// 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
// The SUCCEEDED/FAILED status is the final status which means
// the log aggregation is finished. And the log aggregation status will
// not be updated anymore.
if (logFailedCount > 0 && isAppInFinalState(this)) {
this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
return LogAggregationStatus.FAILED;
} else if (logTimeOutCount > 0) {
return LogAggregationStatus.TIME_OUT;
}
if (isAppInFinalState(this)) {
this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED;
return LogAggregationStatus.SUCCEEDED;
}
}
return LogAggregationStatus.RUNNING;
} finally {
this.readLock.unlock();
}
}
}

View File

@ -30,10 +30,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.util.Apps;
@ -93,15 +93,16 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
.td("Log Aggregation does not Start.")._();
table_description.tr().td(LogAggregationStatus.RUNNING.name())
.td("Log Aggregation is Running.")._();
table_description.tr().td(LogAggregationStatus.FINISHED.name())
.td("Log Aggregation is Finished. All of the logs have been "
table_description.tr().td(LogAggregationStatus.SUCCEEDED.name())
.td("Log Aggregation is Succeeded. All of the logs have been "
+ "aggregated successfully.")._();
table_description.tr().td(LogAggregationStatus.FAILED.name())
.td("Log Aggregation is Failed. At least one of the logs "
+ "have not been aggregated.")._();
table_description.tr().td(LogAggregationStatus.TIME_OUT.name())
.td("Does not get the Log aggregation status for a long time. "
+ "Not sure what is the current Log Aggregation Status.")._();
.td("The application is finished, but the log aggregation status is "
+ "not updated for a long time. Not sure whether the log aggregation "
+ "is finished or not.")._();
table_description._();
div_description._();

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -94,6 +95,8 @@ public class AppInfo {
protected List<ResourceRequest> resourceRequests;
protected LogAggregationStatus logAggregationStatus;
public AppInfo() {
} // JAXB needs this
@ -141,7 +144,7 @@ public class AppInfo {
this.finishedTime = app.getFinishTime();
this.elapsedTime = Times.elapsed(app.getStartTime(),
app.getFinishTime());
this.logAggregationStatus = app.getLogAggregationStatusForAppReport();
RMAppAttempt attempt = app.getCurrentAppAttempt();
if (attempt != null) {
Container masterContainer = attempt.getMasterContainer();
@ -314,4 +317,8 @@ public class AppInfo {
public List<ResourceRequest> getResourceRequests() {
return this.resourceRequests;
}
public LogAggregationStatus getLogAggregationStatus() {
return this.logAggregationStatus;
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -196,6 +197,11 @@ public abstract class MockAsm extends MockApps {
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public LogAggregationStatus getLogAggregationStatusForAppReport() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
public static RMApp newApplication(int i) {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -38,7 +39,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
@ -267,7 +267,7 @@ public class TestRMAppLogAggregationStatus {
// Finally, node1 finished its log aggregation and sent out its final
// log aggregation status. The log aggregation status for node1 should
// be changed from TIME_OUT to Finished
// be changed from TIME_OUT to SUCCEEDED
Map<ApplicationId, LogAggregationReport> node1ReportForApp3 =
new HashMap<ApplicationId, LogAggregationReport>();
String messageForNode1_3 =
@ -275,7 +275,7 @@ public class TestRMAppLogAggregationStatus {
+ System.currentTimeMillis();
LogAggregationReport report1_3 =
LogAggregationReport.newInstance(appId, nodeId1,
LogAggregationStatus.FINISHED, messageForNode1_3);
LogAggregationStatus.SUCCEEDED, messageForNode1_3);
node1ReportForApp3.put(appId, report1_3);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
.newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
@ -288,7 +288,7 @@ public class TestRMAppLogAggregationStatus {
for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
.entrySet()) {
if (report.getKey().equals(node1.getNodeID())) {
Assert.assertEquals(LogAggregationStatus.FINISHED, report.getValue()
Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
.getLogAggregationStatus());
Assert.assertEquals(messageForNode1_1 + messageForNode1_2
+ messageForNode1_3, report.getValue().getDiagnosticMessage());
@ -303,6 +303,104 @@ public class TestRMAppLogAggregationStatus {
}
}
@Test (timeout = 10000)
public void testGetLogAggregationStatusForAppReport() {
YarnConfiguration conf = new YarnConfiguration();
// Disable the log aggregation
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
RMAppImpl rmApp = (RMAppImpl)createRMApp(conf);
// The log aggregation status should be DISABLED.
Assert.assertEquals(LogAggregationStatus.DISABLED,
rmApp.getLogAggregationStatusForAppReport());
// Enable the log aggregation
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
rmApp = (RMAppImpl)createRMApp(conf);
// If we do not know any NodeManagers for this application ,
// the log aggregation status will return null
Assert.assertNull(rmApp.getLogAggregationStatusForAppReport());
NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
NodeId nodeId3 = NodeId.newInstance("localhost", 3333);
NodeId nodeId4 = NodeId.newInstance("localhost", 4444);
// If the log aggregation status for all NMs are NOT_START,
// the log aggregation status for this app will return NOT_START
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
Assert.assertEquals(LogAggregationStatus.NOT_START,
rmApp.getLogAggregationStatusForAppReport());
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
Assert.assertEquals(LogAggregationStatus.RUNNING,
rmApp.getLogAggregationStatusForAppReport());
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
// If at least of one log aggregation status for one NM is TIME_OUT,
// others are SUCCEEDED, the log aggregation status for this app will
// return TIME_OUT
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.TIME_OUT,
rmApp.getLogAggregationStatusForAppReport());
// If the log aggregation status for all NMs are SUCCEEDED and Application
// is at the final state, the log aggregation status for this app will
// return SUCCEEDED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
rmApp.getLogAggregationStatusForAppReport());
rmApp = (RMAppImpl)createRMApp(conf);
rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
// If at least of one log aggregation status for one NM is FAILED,
// others are either SUCCEEDED or TIME_OUT, and this application is
// at the final state, the log aggregation status for this app
// will return FAILED
rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
Assert.assertEquals(LogAggregationStatus.FAILED,
rmApp.getLogAggregationStatusForAppReport());
}
private RMApp createRMApp(Configuration conf) {
ApplicationSubmissionContext submissionContext =
ApplicationSubmissionContext.newInstance(appId, "test", "default",

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -277,4 +278,9 @@ public class MockRMApp implements RMApp {
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public LogAggregationStatus getLogAggregationStatusForAppReport() {
return null;
}
}

View File

@ -1307,14 +1307,15 @@ public class TestRMWebServicesApps extends JerseyTestBase {
WebServicesTestUtils.getXmlInt(element, "preemptedResourceMB"),
WebServicesTestUtils.getXmlInt(element, "preemptedResourceVCores"),
WebServicesTestUtils.getXmlInt(element, "numNonAMContainerPreempted"),
WebServicesTestUtils.getXmlInt(element, "numAMContainerPreempted"));
WebServicesTestUtils.getXmlInt(element, "numAMContainerPreempted"),
WebServicesTestUtils.getXmlString(element, "logAggregationStatus"));
}
}
public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
Exception {
assertEquals("incorrect number of elements", 27, info.length());
assertEquals("incorrect number of elements", 28, info.length());
verifyAppInfoGeneric(app, info.getString("id"), info.getString("user"),
info.getString("name"), info.getString("applicationType"),
@ -1329,7 +1330,8 @@ public class TestRMWebServicesApps extends JerseyTestBase {
info.getInt("preemptedResourceMB"),
info.getInt("preemptedResourceVCores"),
info.getInt("numNonAMContainerPreempted"),
info.getInt("numAMContainerPreempted"));
info.getInt("numAMContainerPreempted"),
info.getString("logAggregationStatus"));
}
public void verifyAppInfoGeneric(RMApp app, String id, String user,
@ -1339,7 +1341,8 @@ public class TestRMWebServicesApps extends JerseyTestBase {
long elapsedTime, String amHostHttpAddress, String amContainerLogs,
int allocatedMB, int allocatedVCores, int numContainers,
int preemptedResourceMB, int preemptedResourceVCores,
int numNonAMContainerPreempted, int numAMContainerPreempted) throws JSONException,
int numNonAMContainerPreempted, int numAMContainerPreempted,
String logAggregationStatus) throws JSONException,
Exception {
WebServicesTestUtils.checkStringMatch("id", app.getApplicationId()
@ -1386,6 +1389,9 @@ public class TestRMWebServicesApps extends JerseyTestBase {
assertEquals("numAMContainerPreempted doesn't match", app
.getRMAppMetrics().getNumAMContainersPreempted(),
numAMContainerPreempted);
assertEquals("Log aggregation Status doesn't match", app
.getLogAggregationStatusForAppReport().toString(),
logAggregationStatus);
}
@Test