YARN-5965. Retrospect ApplicationReport#getApplicationTimeouts. Contributed by Rohith Sharma K S

(cherry picked from commit ab923a53fc)
This commit is contained in:
Sunil 2016-12-08 00:27:25 +05:30 committed by Sunil G
parent d8b4961515
commit a636a87270
7 changed files with 100 additions and 76 deletions

View File

@ -25,7 +25,7 @@
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import java.util.List; import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
@ -451,10 +451,10 @@ public abstract void setLogAggregationStatus(
@Public @Public
@Unstable @Unstable
public abstract List<ApplicationTimeout> getApplicationTimeouts(); public abstract Map<ApplicationTimeoutType, ApplicationTimeout> getApplicationTimeouts();
@Private @Private
@Unstable @Unstable
public abstract void setApplicationTimeouts( public abstract void setApplicationTimeouts(
List<ApplicationTimeout> timeouts); Map<ApplicationTimeoutType, ApplicationTimeout> timeouts);
} }

View File

@ -214,7 +214,12 @@ message ApplicationReportProto {
optional PriorityProto priority = 23; optional PriorityProto priority = 23;
optional string appNodeLabelExpression = 24; optional string appNodeLabelExpression = 24;
optional string amNodeLabelExpression = 25; optional string amNodeLabelExpression = 25;
repeated ApplicationTimeoutProto application_timeouts = 26; repeated AppTimeoutsMapProto appTimeouts = 26;
}
message AppTimeoutsMapProto {
optional ApplicationTimeoutTypeProto application_timeout_type = 1;
optional ApplicationTimeoutProto application_timeout = 2;
} }
message ApplicationTimeoutProto { message ApplicationTimeoutProto {

View File

@ -714,7 +714,8 @@ private int printApplicationReport(String applicationId)
appReportStr.println(appReport.getAppNodeLabelExpression()); appReportStr.println(appReport.getAppNodeLabelExpression());
appReportStr.print("\tAM container Node Label Expression : "); appReportStr.print("\tAM container Node Label Expression : ");
appReportStr.println(appReport.getAmNodeLabelExpression()); appReportStr.println(appReport.getAmNodeLabelExpression());
for (ApplicationTimeout timeout : appReport.getApplicationTimeouts()) { for (ApplicationTimeout timeout : appReport.getApplicationTimeouts()
.values()) {
appReportStr.print("\tTimeoutType : " + timeout.getTimeoutType()); appReportStr.print("\tTimeoutType : " + timeout.getTimeoutType());
appReportStr.print("\tExpiryTime : " + timeout.getExpiryTime()); appReportStr.print("\tExpiryTime : " + timeout.getExpiryTime());
appReportStr.println( appReportStr.println(

View File

@ -130,8 +130,8 @@ public void testGetApplicationReport() throws Exception {
newApplicationReport.setPriority(Priority.newInstance(0)); newApplicationReport.setPriority(Priority.newInstance(0));
ApplicationTimeout timeout = ApplicationTimeout ApplicationTimeout timeout = ApplicationTimeout
.newInstance(ApplicationTimeoutType.LIFETIME, "UNLIMITED", -1); .newInstance(ApplicationTimeoutType.LIFETIME, "UNLIMITED", -1);
newApplicationReport newApplicationReport.setApplicationTimeouts(
.setApplicationTimeouts(Collections.singletonList(timeout)); Collections.singletonMap(timeout.getTimeoutType(), timeout));
when(client.getApplicationReport(any(ApplicationId.class))).thenReturn( when(client.getApplicationReport(any(ApplicationId.class))).thenReturn(
newApplicationReport); newApplicationReport);
@ -2104,7 +2104,8 @@ public void testUpdateApplicationTimeout() throws Exception {
"N/A", 0.53789f, "YARN", null); "N/A", 0.53789f, "YARN", null);
ApplicationTimeout timeout = ApplicationTimeout ApplicationTimeout timeout = ApplicationTimeout
.newInstance(ApplicationTimeoutType.LIFETIME, "N/A", -1); .newInstance(ApplicationTimeoutType.LIFETIME, "N/A", -1);
appReport.setApplicationTimeouts(Collections.singletonList(timeout)); appReport.setApplicationTimeouts(
Collections.singletonMap(timeout.getTimeoutType(), timeout));
when(client.getApplicationReport(any(ApplicationId.class))) when(client.getApplicationReport(any(ApplicationId.class)))
.thenReturn(appReport); .thenReturn(appReport);

View File

@ -26,11 +26,13 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout; import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.proto.YarnProtos.AppTimeoutsMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
@ -44,10 +46,11 @@
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
import java.util.ArrayList; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
@Private @Private
@ -63,7 +66,7 @@ public class ApplicationReportPBImpl extends ApplicationReport {
private Token amRmToken = null; private Token amRmToken = null;
private Set<String> applicationTags = null; private Set<String> applicationTags = null;
private Priority priority = null; private Priority priority = null;
private List<ApplicationTimeout> applicationTimeoutList = null; private Map<ApplicationTimeoutType, ApplicationTimeout> applicationTimeouts = null;
public ApplicationReportPBImpl() { public ApplicationReportPBImpl() {
builder = ApplicationReportProto.newBuilder(); builder = ApplicationReportProto.newBuilder();
@ -498,8 +501,8 @@ private void mergeLocalToBuilder() {
builder.getPriority())) { builder.getPriority())) {
builder.setPriority(convertToProtoFormat(this.priority)); builder.setPriority(convertToProtoFormat(this.priority));
} }
if (this.applicationTimeoutList != null) { if (this.applicationTimeouts != null) {
addLocalApplicationTimeoutToProto(); addApplicationTimeouts();
} }
} }
@ -679,61 +682,38 @@ public void setAmNodeLabelExpression(String amNodeLabelExpression) {
} }
@Override @Override
public List<ApplicationTimeout> getApplicationTimeouts() { public Map<ApplicationTimeoutType, ApplicationTimeout> getApplicationTimeouts() {
initLocalApplicationsList(); initApplicationTimeout();
return this.applicationTimeoutList; return this.applicationTimeouts;
} }
private void initLocalApplicationsList() { @Override
if (this.applicationTimeoutList != null) { public void setApplicationTimeouts(
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts) {
if (timeouts == null) {
return;
}
initApplicationTimeout();
this.applicationTimeouts.clear();
this.applicationTimeouts.putAll(timeouts);
}
private void initApplicationTimeout() {
if (this.applicationTimeouts != null) {
return; return;
} }
ApplicationReportProtoOrBuilder p = viaProto ? proto : builder; ApplicationReportProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationTimeoutProto> list = p.getApplicationTimeoutsList(); List<AppTimeoutsMapProto> lists = p.getAppTimeoutsList();
this.applicationTimeoutList = new ArrayList<ApplicationTimeout>(); this.applicationTimeouts =
new HashMap<ApplicationTimeoutType, ApplicationTimeout>(lists.size());
for (ApplicationTimeoutProto a : list) { for (AppTimeoutsMapProto timeoutProto : lists) {
this.applicationTimeoutList.add(convertFromProtoFormat(a)); this.applicationTimeouts.put(
ProtoUtils
.convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()),
convertFromProtoFormat(timeoutProto.getApplicationTimeout()));
} }
} }
private void addLocalApplicationTimeoutToProto() {
maybeInitBuilder();
builder.clearApplicationTimeouts();
if (applicationTimeoutList == null) {
return;
}
Iterable<ApplicationTimeoutProto> iterable =
new Iterable<ApplicationTimeoutProto>() {
@Override
public Iterator<ApplicationTimeoutProto> iterator() {
return new Iterator<ApplicationTimeoutProto>() {
private Iterator<ApplicationTimeout> iter =
applicationTimeoutList.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ApplicationTimeoutProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllApplicationTimeouts(iterable);
}
private ApplicationTimeoutPBImpl convertFromProtoFormat( private ApplicationTimeoutPBImpl convertFromProtoFormat(
ApplicationTimeoutProto p) { ApplicationTimeoutProto p) {
return new ApplicationTimeoutPBImpl(p); return new ApplicationTimeoutPBImpl(p);
@ -743,12 +723,45 @@ private ApplicationTimeoutProto convertToProtoFormat(ApplicationTimeout t) {
return ((ApplicationTimeoutPBImpl) t).getProto(); return ((ApplicationTimeoutPBImpl) t).getProto();
} }
@Override private void addApplicationTimeouts() {
public void setApplicationTimeouts(List<ApplicationTimeout> timeouts) {
maybeInitBuilder(); maybeInitBuilder();
if (timeouts == null) { builder.clearAppTimeouts();
builder.clearApplicationTimeouts(); if (applicationTimeouts == null) {
return;
} }
this.applicationTimeoutList = timeouts; Iterable<? extends AppTimeoutsMapProto> values =
new Iterable<AppTimeoutsMapProto>() {
@Override
public Iterator<AppTimeoutsMapProto> iterator() {
return new Iterator<AppTimeoutsMapProto>() {
private Iterator<ApplicationTimeoutType> iterator =
applicationTimeouts.keySet().iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public AppTimeoutsMapProto next() {
ApplicationTimeoutType key = iterator.next();
return AppTimeoutsMapProto.newBuilder()
.setApplicationTimeout(
convertToProtoFormat(applicationTimeouts.get(key)))
.setApplicationTimeoutType(
ProtoUtils.convertToProtoFormat(key))
.build();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllAppTimeouts(values);
} }
} }

View File

@ -723,10 +723,17 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
long timeoutInMillis = applicationTimeouts long timeoutInMillis = applicationTimeouts
.get(ApplicationTimeoutType.LIFETIME).longValue(); .get(ApplicationTimeoutType.LIFETIME).longValue();
timeout.setExpiryTime(Times.formatISO8601(timeoutInMillis)); timeout.setExpiryTime(Times.formatISO8601(timeoutInMillis));
timeout.setRemainingTime( if (isAppInCompletedStates()) {
Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0)); // if application configured with timeout and finished before timeout
// happens then remaining time should not be calculated.
timeout.setRemainingTime(0);
} else {
timeout.setRemainingTime(
Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0));
}
} }
report.setApplicationTimeouts(Collections.singletonList(timeout)); report.setApplicationTimeouts(
Collections.singletonMap(timeout.getTimeoutType(), timeout));
return report; return report;
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();

View File

@ -24,7 +24,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -136,15 +135,13 @@ public void testApplicationLifetimeMonitor() throws Exception {
GetApplicationReportRequest appRequest = GetApplicationReportRequest appRequest =
recordFactory.newRecordInstance(GetApplicationReportRequest.class); recordFactory.newRecordInstance(GetApplicationReportRequest.class);
appRequest.setApplicationId(app2.getApplicationId()); appRequest.setApplicationId(app2.getApplicationId());
List<ApplicationTimeout> applicationTimeoutList = rm.getRMContext() Map<ApplicationTimeoutType, ApplicationTimeout> appTimeouts = rm
.getClientRMService().getApplicationReport(appRequest) .getRMContext().getClientRMService().getApplicationReport(appRequest)
.getApplicationReport().getApplicationTimeouts(); .getApplicationReport().getApplicationTimeouts();
Assert.assertTrue("Application Timeout list are empty.", Assert.assertTrue("Application Timeout are empty.",
!applicationTimeoutList.isEmpty()); !appTimeouts.isEmpty());
ApplicationTimeout timeout = applicationTimeoutList.iterator().next(); ApplicationTimeout timeout =
Assert.assertEquals("Application timeout Type is incorrect.", appTimeouts.get(ApplicationTimeoutType.LIFETIME);
ApplicationTimeoutType.LIFETIME.toString(),
timeout.getTimeoutType().toString());
Assert.assertEquals("Application timeout string is incorrect.", Assert.assertEquals("Application timeout string is incorrect.",
formatISO8601, timeout.getExpiryTime()); formatISO8601, timeout.getExpiryTime());
Assert.assertTrue("Application remaining time is incorrect", Assert.assertTrue("Application remaining time is incorrect",