YARN-5965. Retrospect ApplicationReport#getApplicationTimeouts. Contributed by Rohith Sharma K S

This commit is contained in:
Sunil 2016-12-08 00:27:25 +05:30
parent c73e08a6da
commit ab923a53fc
7 changed files with 100 additions and 76 deletions

View File

@ -25,7 +25,7 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.util.Records;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -451,10 +451,10 @@ public abstract void setLogAggregationStatus(
@Public
@Unstable
public abstract List<ApplicationTimeout> getApplicationTimeouts();
public abstract Map<ApplicationTimeoutType, ApplicationTimeout> getApplicationTimeouts();
@Private
@Unstable
public abstract void setApplicationTimeouts(
List<ApplicationTimeout> timeouts);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts);
}

View File

@ -214,7 +214,12 @@ message ApplicationReportProto {
optional PriorityProto priority = 23;
optional string appNodeLabelExpression = 24;
optional string amNodeLabelExpression = 25;
repeated ApplicationTimeoutProto application_timeouts = 26;
repeated AppTimeoutsMapProto appTimeouts = 26;
}
message AppTimeoutsMapProto {
optional ApplicationTimeoutTypeProto application_timeout_type = 1;
optional ApplicationTimeoutProto application_timeout = 2;
}
message ApplicationTimeoutProto {

View File

@ -714,7 +714,8 @@ private int printApplicationReport(String applicationId)
appReportStr.println(appReport.getAppNodeLabelExpression());
appReportStr.print("\tAM container Node Label Expression : ");
appReportStr.println(appReport.getAmNodeLabelExpression());
for (ApplicationTimeout timeout : appReport.getApplicationTimeouts()) {
for (ApplicationTimeout timeout : appReport.getApplicationTimeouts()
.values()) {
appReportStr.print("\tTimeoutType : " + timeout.getTimeoutType());
appReportStr.print("\tExpiryTime : " + timeout.getExpiryTime());
appReportStr.println(

View File

@ -130,8 +130,8 @@ public void testGetApplicationReport() throws Exception {
newApplicationReport.setPriority(Priority.newInstance(0));
ApplicationTimeout timeout = ApplicationTimeout
.newInstance(ApplicationTimeoutType.LIFETIME, "UNLIMITED", -1);
newApplicationReport
.setApplicationTimeouts(Collections.singletonList(timeout));
newApplicationReport.setApplicationTimeouts(
Collections.singletonMap(timeout.getTimeoutType(), timeout));
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport);
@ -2104,7 +2104,8 @@ public void testUpdateApplicationTimeout() throws Exception {
"N/A", 0.53789f, "YARN", null);
ApplicationTimeout timeout = ApplicationTimeout
.newInstance(ApplicationTimeoutType.LIFETIME, "N/A", -1);
appReport.setApplicationTimeouts(Collections.singletonList(timeout));
appReport.setApplicationTimeouts(
Collections.singletonMap(timeout.getTimeoutType(), timeout));
when(client.getApplicationReport(any(ApplicationId.class)))
.thenReturn(appReport);

View File

@ -26,11 +26,13 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.proto.YarnProtos.AppTimeoutsMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
@ -44,10 +46,11 @@
import com.google.protobuf.TextFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Private
@ -63,7 +66,7 @@ public class ApplicationReportPBImpl extends ApplicationReport {
private Token amRmToken = null;
private Set<String> applicationTags = null;
private Priority priority = null;
private List<ApplicationTimeout> applicationTimeoutList = null;
private Map<ApplicationTimeoutType, ApplicationTimeout> applicationTimeouts = null;
public ApplicationReportPBImpl() {
builder = ApplicationReportProto.newBuilder();
@ -498,8 +501,8 @@ private void mergeLocalToBuilder() {
builder.getPriority())) {
builder.setPriority(convertToProtoFormat(this.priority));
}
if (this.applicationTimeoutList != null) {
addLocalApplicationTimeoutToProto();
if (this.applicationTimeouts != null) {
addApplicationTimeouts();
}
}
@ -679,61 +682,38 @@ public void setAmNodeLabelExpression(String amNodeLabelExpression) {
}
@Override
public List<ApplicationTimeout> getApplicationTimeouts() {
initLocalApplicationsList();
return this.applicationTimeoutList;
public Map<ApplicationTimeoutType, ApplicationTimeout> getApplicationTimeouts() {
initApplicationTimeout();
return this.applicationTimeouts;
}
private void initLocalApplicationsList() {
if (this.applicationTimeoutList != null) {
@Override
public void setApplicationTimeouts(
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts) {
if (timeouts == null) {
return;
}
initApplicationTimeout();
this.applicationTimeouts.clear();
this.applicationTimeouts.putAll(timeouts);
}
private void initApplicationTimeout() {
if (this.applicationTimeouts != null) {
return;
}
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationTimeoutProto> list = p.getApplicationTimeoutsList();
this.applicationTimeoutList = new ArrayList<ApplicationTimeout>();
for (ApplicationTimeoutProto a : list) {
this.applicationTimeoutList.add(convertFromProtoFormat(a));
List<AppTimeoutsMapProto> lists = p.getAppTimeoutsList();
this.applicationTimeouts =
new HashMap<ApplicationTimeoutType, ApplicationTimeout>(lists.size());
for (AppTimeoutsMapProto timeoutProto : lists) {
this.applicationTimeouts.put(
ProtoUtils
.convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()),
convertFromProtoFormat(timeoutProto.getApplicationTimeout()));
}
}
private void addLocalApplicationTimeoutToProto() {
maybeInitBuilder();
builder.clearApplicationTimeouts();
if (applicationTimeoutList == null) {
return;
}
Iterable<ApplicationTimeoutProto> iterable =
new Iterable<ApplicationTimeoutProto>() {
@Override
public Iterator<ApplicationTimeoutProto> iterator() {
return new Iterator<ApplicationTimeoutProto>() {
private Iterator<ApplicationTimeout> iter =
applicationTimeoutList.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ApplicationTimeoutProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllApplicationTimeouts(iterable);
}
private ApplicationTimeoutPBImpl convertFromProtoFormat(
ApplicationTimeoutProto p) {
return new ApplicationTimeoutPBImpl(p);
@ -743,12 +723,45 @@ private ApplicationTimeoutProto convertToProtoFormat(ApplicationTimeout t) {
return ((ApplicationTimeoutPBImpl) t).getProto();
}
@Override
public void setApplicationTimeouts(List<ApplicationTimeout> timeouts) {
private void addApplicationTimeouts() {
maybeInitBuilder();
if (timeouts == null) {
builder.clearApplicationTimeouts();
builder.clearAppTimeouts();
if (applicationTimeouts == null) {
return;
}
this.applicationTimeoutList = timeouts;
Iterable<? extends AppTimeoutsMapProto> values =
new Iterable<AppTimeoutsMapProto>() {
@Override
public Iterator<AppTimeoutsMapProto> iterator() {
return new Iterator<AppTimeoutsMapProto>() {
private Iterator<ApplicationTimeoutType> iterator =
applicationTimeouts.keySet().iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public AppTimeoutsMapProto next() {
ApplicationTimeoutType key = iterator.next();
return AppTimeoutsMapProto.newBuilder()
.setApplicationTimeout(
convertToProtoFormat(applicationTimeouts.get(key)))
.setApplicationTimeoutType(
ProtoUtils.convertToProtoFormat(key))
.build();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllAppTimeouts(values);
}
}

View File

@ -790,10 +790,17 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
long timeoutInMillis = applicationTimeouts
.get(ApplicationTimeoutType.LIFETIME).longValue();
timeout.setExpiryTime(Times.formatISO8601(timeoutInMillis));
timeout.setRemainingTime(
Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0));
if (isAppInCompletedStates()) {
// if application configured with timeout and finished before timeout
// happens then remaining time should not be calculated.
timeout.setRemainingTime(0);
} else {
timeout.setRemainingTime(
Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0));
}
}
report.setApplicationTimeouts(Collections.singletonList(timeout));
report.setApplicationTimeouts(
Collections.singletonMap(timeout.getTimeoutType(), timeout));
return report;
} finally {
this.readLock.unlock();

View File

@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -136,15 +135,13 @@ public void testApplicationLifetimeMonitor() throws Exception {
GetApplicationReportRequest appRequest =
recordFactory.newRecordInstance(GetApplicationReportRequest.class);
appRequest.setApplicationId(app2.getApplicationId());
List<ApplicationTimeout> applicationTimeoutList = rm.getRMContext()
.getClientRMService().getApplicationReport(appRequest)
Map<ApplicationTimeoutType, ApplicationTimeout> appTimeouts = rm
.getRMContext().getClientRMService().getApplicationReport(appRequest)
.getApplicationReport().getApplicationTimeouts();
Assert.assertTrue("Application Timeout list are empty.",
!applicationTimeoutList.isEmpty());
ApplicationTimeout timeout = applicationTimeoutList.iterator().next();
Assert.assertEquals("Application timeout Type is incorrect.",
ApplicationTimeoutType.LIFETIME.toString(),
timeout.getTimeoutType().toString());
Assert.assertTrue("Application Timeout are empty.",
!appTimeouts.isEmpty());
ApplicationTimeout timeout =
appTimeouts.get(ApplicationTimeoutType.LIFETIME);
Assert.assertEquals("Application timeout string is incorrect.",
formatISO8601, timeout.getExpiryTime());
Assert.assertTrue("Application remaining time is incorrect",