YARN-5585. [Atsv2] Reader side changes for entity prefix and support for pagination via additional filters (Rohith Sharma K S via Varun Saxena)

(cherry picked from commit 9a2f288a6a9e30376b7fd99bf8824184a34a54c9)
This commit is contained in:
Varun Saxena 2017-01-07 01:38:36 +05:30
parent d3ef478955
commit 1e5e4a52f7
20 changed files with 511 additions and 159 deletions

View File

@ -549,20 +549,10 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
public int compareTo(TimelineEntity other) {
int comparison = getType().compareTo(other.getType());
if (comparison == 0) {
if (getCreatedTime() == null) {
if (other.getCreatedTime() == null) {
return getId().compareTo(other.getId());
} else {
return 1;
}
}
if (other.getCreatedTime() == null) {
if (getIdPrefix() > other.getIdPrefix()) {
// Descending order by entity id prefix
return -1;
}
if (getCreatedTime() > other.getCreatedTime()) {
// Order by created time desc
return -1;
} else if (getCreatedTime() < other.getCreatedTime()) {
} else if (getIdPrefix() < other.getIdPrefix()) {
return 1;
} else {
return getId().compareTo(other.getId());

View File

@ -214,7 +214,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
entity4.addMetrics(metrics);
te4.addEntity(entity4);
TimelineEntities te5 = new TimelineEntities();
TimelineEntities userEntities = new TimelineEntities();
TimelineEntity entity5 = new TimelineEntity();
entity5.setId("entity1");
entity5.setType("type1");
@ -270,7 +270,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
relatesTo1.put("type3",
Sets.newHashSet("entity31", "entity35", "entity32", "entity33"));
entity5.addRelatesToEntities(relatesTo1);
te5.addEntity(entity5);
userEntities.addEntity(entity5);
TimelineEntity entity6 = new TimelineEntity();
entity6.setId("entity2");
@ -329,7 +329,16 @@ public class TestTimelineReaderWebServicesHBaseStorage {
relatesTo2.put("type6", Sets.newHashSet("entity61", "entity66"));
relatesTo2.put("type3", Sets.newHashSet("entity31"));
entity6.addRelatesToEntities(relatesTo2);
te5.addEntity(entity6);
userEntities.addEntity(entity6);
for (long i = 1; i <= 10; i++) {
TimelineEntity userEntity = new TimelineEntity();
userEntity.setType("entitytype");
userEntity.setId("entityid-" + i);
userEntity.setIdPrefix(11 - i);
userEntity.setCreatedTime(System.currentTimeMillis());
userEntities.addEntity(userEntity);
}
HBaseTimelineWriterImpl hbi = null;
Configuration c1 = util.getConfiguration();
@ -342,7 +351,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
hbi.write(cluster, user, flow2,
flowVersion2, runid2, entity3.getId(), te3);
hbi.write(cluster, user, flow, flowVersion, runid,
"application_1111111111_1111", te5);
"application_1111111111_1111", userEntities);
hbi.flush();
} finally {
if (hbi != null) {
@ -784,7 +793,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
assertEquals(TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(
new TimelineReaderContext(context.getClusterId(),
context.getUserId(), context.getFlowName(),
context.getFlowRunId(), context.getAppId(), "type1",
context.getFlowRunId(), context.getAppId(), "type1",
entity.getIdPrefix(),
entity.getId())), entityUID);
}
}
@ -860,8 +870,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
String uid =
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
assertNotNull(uid);
assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!entity1") ||
uid.equals(appUIDWithFlowInfo + "!type1!entity2"));
assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!0!entity1")
|| uid.equals(appUIDWithFlowInfo + "!type1!0!entity2"));
}
String appUIDWithoutFlowInfo = "cluster1!application_1111111111_1111";
@ -887,11 +897,11 @@ public class TestTimelineReaderWebServicesHBaseStorage {
String uid =
(String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
assertNotNull(uid);
assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!entity1") ||
uid.equals(appUIDWithoutFlowInfo + "!type1!entity2"));
assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!0!entity1")
|| uid.equals(appUIDWithoutFlowInfo + "!type1!0!entity2"));
}
String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!entity1";
String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!0!entity1";
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
"entity-uid/" + entityUIDWithFlowInfo);
resp = getResponse(client, uri);
@ -901,7 +911,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
assertEquals("entity1", singleEntity1.getId());
String entityUIDWithoutFlowInfo =
appUIDWithoutFlowInfo + "!type1!entity1";
appUIDWithoutFlowInfo + "!type1!0!entity1";
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
"entity-uid/" + entityUIDWithoutFlowInfo);
resp = getResponse(client, uri);
@ -2162,4 +2172,74 @@ public class TestTimelineReaderWebServicesHBaseStorage {
server = null;
}
}
@Test
public void testGenericEntitiesForPagination() throws Exception {
Client client = createClient();
try {
int limit = 10;
String queryParam = "?limit=" + limit;
String resourceUri = "http://localhost:" + serverPort + "/ws/v2/"
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/"
+ "entities/entitytype";
URI uri = URI.create(resourceUri + queryParam);
ClientResponse resp = getResponse(client, uri);
List<TimelineEntity> entities =
resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-10 to entity-1 in descending order.
verifyPaginatedEntites(entities, limit, limit);
limit = 4;
queryParam = "?limit=" + limit;
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-10 to entity-7 in descending order.
TimelineEntity entity = verifyPaginatedEntites(entities, limit, 10);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix()
+ "&&fromid=" + entity.getId();
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-7 to entity-4 in descending order.
entity = verifyPaginatedEntites(entities, limit, 7);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix();
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// verify for entity-4 to entity-1 in descending order.
entity = verifyPaginatedEntites(entities, limit, 4);
queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix();
uri = URI.create(resourceUri + queryParam);
resp = getResponse(client, uri);
entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
});
// always entity-1 will be retrieved
entity = verifyPaginatedEntites(entities, 1, 1);
} finally {
client.destroy();
}
}
private TimelineEntity verifyPaginatedEntites(List<TimelineEntity> entities,
int limit, int startFrom) {
assertNotNull(entities);
assertEquals(limit, entities.size());
TimelineEntity entity = null;
for (TimelineEntity timelineEntity : entities) {
assertEquals("entitytype", timelineEntity.getType());
assertEquals("entityid-" + startFrom, timelineEntity.getId());
assertEquals(11 - startFrom--, timelineEntity.getIdPrefix());
entity = timelineEntity;
}
return entity;
}
}

View File

@ -35,10 +35,10 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyVa
* <li><b>limit</b> - A limit on the number of entities to return. If null or
* {@literal < 0}, defaults to {@link #DEFAULT_LIMIT}. The maximum possible
* value for limit can be {@link Long#MAX_VALUE}.</li>
* <li><b>createdTimeBegin</b> - Matched entities should not be created
* before this timestamp. If null or {@literal <=0}, defaults to 0.</li>
* <li><b>createdTimeEnd</b> - Matched entities should not be created after
* this timestamp. If null or {@literal <=0}, defaults to
* <li><b>createdTimeBegin</b> - Matched entities should not be created before
* this timestamp. If null or {@literal <=0}, defaults to 0.</li>
* <li><b>createdTimeEnd</b> - Matched entities should not be created after this
* timestamp. If null or {@literal <=0}, defaults to
* {@link Long#MAX_VALUE}.</li>
* <li><b>relatesTo</b> - Matched entities should or should not relate to given
* entities depending on what's specified in the filter. The entities in
@ -99,6 +99,19 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyVa
* filter list, event filters can be evaluated with logical AND/OR and we can
* create a hierarchy of these {@link TimelineExistsFilter} objects. If null or
* empty, the filter is not applied.</li>
* <li><b>fromIdPrefix</b> - If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix is same
* for all entities of a given entity type, then the user must provide fromId as
* a filter to denote the start entity from which further entities will be
* fetched. fromIdPrefix is mandatory even in the case the entity id prefix is
* not used and should be set to 0.</li>
* <li><b>fromId</b> - If specified along with fromIdPrefix, retrieve entities
* with an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal to entity
* id specified in fromId. Please note than fromIdPrefix is mandatory if fromId
* is specified, otherwise, the filter will be ignored. It is recommended to
* provide both fromIdPrefix and fromId filters for more accurate results as id
* prefix may not be unique for an entity.</li>
* </ul>
*/
@Private
@ -113,9 +126,12 @@ public class TimelineEntityFilters {
private TimelineFilterList configFilters;
private TimelineFilterList metricFilters;
private TimelineFilterList eventFilters;
private Long fromIdPrefix;
private String fromId;
private static final long DEFAULT_BEGIN_TIME = 0L;
private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
/**
* Default limit of number of entities to return for getEntities API.
*/
@ -125,6 +141,19 @@ public class TimelineEntityFilters {
this(null, null, null, null, null, null, null, null, null);
}
public TimelineEntityFilters(Long entityLimit, Long timeBegin, Long timeEnd,
TimelineFilterList entityRelatesTo, TimelineFilterList entityIsRelatedTo,
TimelineFilterList entityInfoFilters,
TimelineFilterList entityConfigFilters,
TimelineFilterList entityMetricFilters,
TimelineFilterList entityEventFilters, Long fromidprefix, String fromid) {
this(entityLimit, timeBegin, timeEnd, entityRelatesTo, entityIsRelatedTo,
entityInfoFilters, entityConfigFilters, entityMetricFilters,
entityEventFilters);
this.fromIdPrefix = fromidprefix;
this.fromId = fromid;
}
public TimelineEntityFilters(
Long entityLimit, Long timeBegin, Long timeEnd,
TimelineFilterList entityRelatesTo,
@ -239,4 +268,20 @@ public class TimelineEntityFilters {
public void setEventFilters(TimelineFilterList filters) {
this.eventFilters = filters;
}
public String getFromId() {
return fromId;
}
public void setFromId(String fromId) {
this.fromId = fromId;
}
public Long getFromIdPrefix() {
return fromIdPrefix;
}
public void setFromIdPrefix(Long fromIdPrefix) {
this.fromIdPrefix = fromIdPrefix;
}
}

View File

@ -31,6 +31,7 @@ public class TimelineReaderContext extends TimelineContext {
private String entityType;
private String entityId;
private Long entityIdPrefix;
public TimelineReaderContext(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType, String entityId) {
super(clusterId, userId, flowName, flowRunId, appId);
@ -38,16 +39,25 @@ public class TimelineReaderContext extends TimelineContext {
this.entityId = entityId;
}
public TimelineReaderContext(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType, Long entityIdPrefix,
String entityId) {
this(clusterId, userId, flowName, flowRunId, appId, entityType, entityId);
this.entityIdPrefix = entityIdPrefix;
}
public TimelineReaderContext(TimelineReaderContext other) {
this(other.getClusterId(), other.getUserId(), other.getFlowName(),
other.getFlowRunId(), other.getAppId(), other.getEntityType(),
other.getEntityId());
other.getEntityIdPrefix(), other.getEntityId());
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result
+ ((entityIdPrefix == null) ? 0 : entityIdPrefix.hashCode());
result = prime * result + ((entityId == null) ? 0 : entityId.hashCode());
result =
prime * result + ((entityType == null) ? 0 : entityType.hashCode());
@ -95,4 +105,12 @@ public class TimelineReaderContext extends TimelineContext {
public void setEntityId(String id) {
this.entityId = id;
}
public Long getEntityIdPrefix() {
return entityIdPrefix;
}
public void setEntityIdPrefix(Long entityIdPrefix) {
this.entityIdPrefix = entityIdPrefix;
}
}

View File

@ -113,6 +113,7 @@ public class TimelineReaderManager extends AbstractService {
}
}
context.setEntityType(entity.getType());
context.setEntityIdPrefix(entity.getIdPrefix());
context.setEntityId(entity.getId());
entity.setUID(UID_KEY,
TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context));

View File

@ -264,6 +264,20 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -294,7 +308,9 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
@ -317,7 +333,8 @@ public class TimelineReaderWebServices {
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters),
infofilters, conffilters, metricfilters, eventfilters,
fromIdPrefix, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -400,6 +417,20 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -435,11 +466,14 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getEntities(req, res, null, appId, entityType, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
fromId);
}
/**
@ -510,6 +544,20 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If fromIdPrefix
* is same for all entities of a given entity type, then the user must
* provide fromId as a filter to denote the start entity from which
* further entities will be fetched. fromIdPrefix is mandatory even
* in the case the entity id prefix is not used and should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances of the given entity type
@ -546,7 +594,9 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
@ -559,12 +609,14 @@ public class TimelineReaderWebServices {
TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
Set<TimelineEntity> entities = null;
try {
entities = timelineReaderManager.getEntities(
TimelineReaderWebServicesUtils.createTimelineReaderContext(
clusterId, userId, flowName, flowRunId, appId, entityType, null),
TimelineReaderContext context = TimelineReaderWebServicesUtils
.createTimelineReaderContext(clusterId, userId, flowName, flowRunId,
appId, entityType, null, null);
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters),
infofilters, conffilters, metricfilters, eventfilters,
fromIdPrefix, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -703,6 +755,8 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br>
@ -729,10 +783,11 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, null, appId, entityType, entityId, userId,
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit);
metricsLimit, entityIdPrefix);
}
/**
@ -774,6 +829,8 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br>
@ -801,7 +858,8 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("entityidprefix") String entityIdPrefix) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
@ -816,7 +874,8 @@ public class TimelineReaderWebServices {
try {
entity = timelineReaderManager.getEntity(
TimelineReaderWebServicesUtils.createTimelineReaderContext(
clusterId, userId, flowName, flowRunId, appId, entityType, entityId),
clusterId, userId, flowName, flowRunId, appId, entityType,
entityIdPrefix, entityId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -992,7 +1051,7 @@ public class TimelineReaderWebServices {
entity = timelineReaderManager.getEntity(
TimelineReaderWebServicesUtils.createTimelineReaderContext(
clusterId, userId, flowName, flowRunId, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, null, null));
} catch (Exception e) {
@ -1081,7 +1140,7 @@ public class TimelineReaderWebServices {
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null),
null, null, null, null, null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null));
} catch (Exception e) {
@ -1217,10 +1276,10 @@ public class TimelineReaderWebServices {
entities = timelineReaderManager.getEntities(
TimelineReaderWebServicesUtils.createTimelineReaderContext(
clusterId, userId, flowName, null, null,
TimelineEntityType.YARN_FLOW_RUN.toString(), null),
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, null, null, null,
null, null, null),
null, null, null, null, null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null));
} catch (Exception e) {
@ -1339,13 +1398,14 @@ public class TimelineReaderWebServices {
DateRange range = parseDateRange(dateRange);
TimelineEntityFilters entityFilters =
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, null, null, null, null, null, null, null, null);
limit, null, null, null, null, null, null, null, null, null,
null);
entityFilters.setCreatedTimeBegin(range.dateStart);
entityFilters.setCreatedTimeEnd(range.dateEnd);
entities = timelineReaderManager.getEntities(
TimelineReaderWebServicesUtils.createTimelineReaderContext(
clusterId, null, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null),
entityFilters, TimelineReaderWebServicesUtils.
createTimelineDataToRetrieve(null, null, null, null));
} catch (Exception e) {
@ -1584,7 +1644,7 @@ public class TimelineReaderWebServices {
entity = timelineReaderManager.getEntity(
TimelineReaderWebServicesUtils.createTimelineReaderContext(
clusterId, userId, flowName, flowRunId, appId,
TimelineEntityType.YARN_APPLICATION.toString(), null),
TimelineEntityType.YARN_APPLICATION.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -1710,7 +1770,8 @@ public class TimelineReaderWebServices {
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters),
infofilters, conffilters, metricfilters, eventfilters, null,
null),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@ -1822,7 +1883,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
}
/**
@ -1924,7 +1985,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
}
/**
@ -2020,7 +2081,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
}
/**
@ -2118,7 +2179,7 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
}
/**
@ -2189,6 +2250,21 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the app-attempt
@ -2221,12 +2297,14 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId,
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve,
metricsToRetrieve, fields, metricsLimit);
metricsToRetrieve, fields, metricsLimit, fromIdPrefix, fromId);
}
/**
@ -2298,6 +2376,21 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the app-attempts
@ -2331,13 +2424,16 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, appId,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), userId,
flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
fromIdPrefix, fromId);
}
/**
@ -2380,6 +2476,8 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br>
@ -2404,9 +2502,11 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("entityidprefix") String entityIdPrefix) {
return getAppAttempt(req, res, null, appId, appAttemptId, userId, flowName,
flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
entityIdPrefix);
}
/**
@ -2449,6 +2549,8 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br>
@ -2475,11 +2577,12 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, clusterId, appId,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptId,
userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit);
metricsLimit, entityIdPrefix);
}
/**
@ -2552,6 +2655,21 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the containers
@ -2585,11 +2703,14 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
return getContainers(req, res, null, appId, appattemptId, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
fromId);
}
/**
@ -2663,6 +2784,21 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param fromIdPrefix If specified, retrieve entities with an id prefix
* greater than or equal to the specified fromIdPrefix. If
* fromIdPrefix is same for all entities of a given entity type, then
* the user must provide fromId as a filter to denote the start
* entity from which further entities will be fetched. fromIdPrefix
* is mandatory even in the case the entity id prefix is not used and
* should be set to 0.
* @param fromId If specified along with fromIdPrefix, retrieve entities with
* an id prefix greater than or equal to specified id prefix in
* fromIdPrefix and entity id lexicographically greater than or equal
* to entity id specified in fromId. Please note than fromIdPrefix is
* mandatory if fromId is specified, otherwise, the filter will be
* ignored. It is recommended to provide both fromIdPrefix and fromId
* filters for more accurate results as id prefix may not be unique
* for an entity.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>TimelineEntity</cite> instances of the containers
@ -2698,7 +2834,9 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("fromidprefix") String fromIdPrefix,
@QueryParam("fromid") String fromId) {
String entityType = TimelineEntityType.YARN_CONTAINER.toString();
String parentEntityType =
@ -2716,7 +2854,8 @@ public class TimelineReaderWebServices {
return getEntities(req, res, clusterId, appId, entityType, userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilter, conffilters, metricfilters, eventfilters,
confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix,
fromId);
}
/**
@ -2758,6 +2897,8 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* <cite>TimelineEntity</cite> instance is returned.<br>
@ -2782,9 +2923,11 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("entityidprefix") String entityIdPrefix) {
return getContainer(req, res, null, appId, containerId, userId, flowName,
flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit,
entityIdPrefix);
}
/**
@ -2827,6 +2970,8 @@ public class TimelineReaderWebServices {
* have to be retrieved, then metricsLimit will be considered as 1
* i.e. latest single value of metric(s) will be returned. (Optional
* query param).
* @param entityIdPrefix Defines the id prefix for the entity to be fetched.
* If specified, then entity retrieval will be faster.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* <cite>TimelineEntity</cite> instance is returned.<br>
@ -2853,11 +2998,12 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
@QueryParam("metricslimit") String metricsLimit) {
@QueryParam("metricslimit") String metricsLimit,
@QueryParam("entityidprefix") String entityIdPrefix) {
return getEntity(req, res, clusterId, appId,
TimelineEntityType.YARN_CONTAINER.toString(), containerId, userId,
flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
metricsLimit);
metricsLimit, entityIdPrefix);
}
/**
@ -2952,7 +3098,7 @@ public class TimelineReaderWebServices {
results = timelineReaderManager.getEntityTypes(
TimelineReaderWebServicesUtils.createTimelineReaderContext(
clusterId, userId, flowName, flowRunId, appId,
null, null));
null, null, null));
} catch (Exception e) {
handleException(e, url, startTime, "flowrunid");
}

View File

@ -23,6 +23,7 @@ import java.util.EnumSet;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@ -49,10 +50,10 @@ final class TimelineReaderWebServicesUtils {
*/
static TimelineReaderContext createTimelineReaderContext(String clusterId,
String userId, String flowName, String flowRunId, String appId,
String entityType, String entityId) {
String entityType, String entityIdPrefix, String entityId) {
return new TimelineReaderContext(parseStr(clusterId), parseStr(userId),
parseStr(flowName), parseLongStr(flowRunId), parseStr(appId),
parseStr(entityType), parseStr(entityId));
parseStr(entityType), parseLongStr(entityIdPrefix), parseStr(entityId));
}
/**
@ -73,12 +74,14 @@ final class TimelineReaderWebServicesUtils {
static TimelineEntityFilters createTimelineEntityFilters(String limit,
String createdTimeStart, String createdTimeEnd, String relatesTo,
String isRelatedTo, String infofilters, String conffilters,
String metricfilters, String eventfilters) throws TimelineParseException {
String metricfilters, String eventfilters, String fromidprefix,
String fromid) throws TimelineParseException {
return new TimelineEntityFilters(parseLongStr(limit),
parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd),
parseRelationFilters(relatesTo), parseRelationFilters(isRelatedTo),
parseKVFilters(infofilters, false), parseKVFilters(conffilters, true),
parseMetricFilters(metricfilters), parseEventFilters(eventfilters));
parseMetricFilters(metricfilters), parseEventFilters(eventfilters),
parseLongStr(fromidprefix), parseStr(fromid));
}
/**
@ -207,7 +210,7 @@ final class TimelineReaderWebServicesUtils {
* @return trimmed string if string is not null, null otherwise.
*/
static String parseStr(String str) {
return str == null ? null : str.trim();
return StringUtils.trimToNull(str);
}
/**

View File

@ -155,12 +155,14 @@ enum TimelineUIDConverter {
// Flow information exists.
String[] entityTupleArr = {context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowRunId().toString(),
context.getAppId(), context.getEntityType(), context.getEntityId()};
context.getAppId(), context.getEntityType(),
context.getEntityIdPrefix().toString(), context.getEntityId() };
return joinAndEscapeUIDParts(entityTupleArr);
} else {
// Only entity and app information exists. Flow info does not exist.
String[] entityTupleArr = {context.getClusterId(), context.getAppId(),
context.getEntityType(), context.getEntityId()};
context.getEntityType(), context.getEntityIdPrefix().toString(),
context.getEntityId() };
return joinAndEscapeUIDParts(entityTupleArr);
}
}
@ -171,20 +173,21 @@ enum TimelineUIDConverter {
return null;
}
List<String> entityTupleList = splitUID(uId);
// Should have 7 parts i.e. cluster, user, flow name, flowrun id, app id,
// entity type and entity id OR should have 4 parts i.e. cluster, app id,
// Should have 8 parts i.e. cluster, user, flow name, flowrun id, app id,
// entity type and entity id OR should have 5 parts i.e. cluster, app id,
// entity type and entity id.
if (entityTupleList.size() == 7) {
if (entityTupleList.size() == 8) {
// Flow information exists.
return new TimelineReaderContext(entityTupleList.get(0),
entityTupleList.get(1), entityTupleList.get(2),
Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4),
entityTupleList.get(5), entityTupleList.get(6));
} else if (entityTupleList.size() == 4) {
entityTupleList.get(5), Long.parseLong(entityTupleList.get(6)),
entityTupleList.get(7));
} else if (entityTupleList.size() == 5) {
// Flow information does not exist.
return new TimelineReaderContext(entityTupleList.get(0), null, null,
null, entityTupleList.get(1), entityTupleList.get(2),
entityTupleList.get(3));
Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4));
} else {
return null;
}

View File

@ -180,6 +180,23 @@ public final class TimelineFilterUtils {
return list;
}
/**
* Creates a HBase {@link SingleColumnValueFilter} with specified column.
* @param <T> Describes the type of column prefix.
* @param column Column which value to be filtered.
* @param value Value to be filtered.
* @param op Compare operator
* @return a SingleColumnValue Filter
* @throws IOException if any exception.
*/
public static <T> Filter createHBaseSingleColValueFilter(Column<T> column,
Object value, CompareOp op) throws IOException {
Filter singleColValFilter = createHBaseSingleColValueFilter(
column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
column.getValueConverter().encodeValue(value), op, true);
return singleColValFilter;
}
/**
* Creates a HBase {@link SingleColumnValueFilter}.
*

View File

@ -125,8 +125,8 @@ public interface TimelineReader extends Service {
* <li><b>flowRunId</b> - Context flow run id.</li>
* <li><b>appId</b> - Context app id.</li>
* </ul>
* Although entityId is also part of context, it has no meaning for
* getEntities.<br>
* Although entityIdPrefix and entityId are also part of context,
* it has no meaning for getEntities.<br>
* Fields in context which are mandatory depends on entity type. Entity
* type is always mandatory. In addition to entity type, below is the list
* of context fields which are mandatory, based on entity type.<br>
@ -161,8 +161,10 @@ public interface TimelineReader extends Service {
* {@link TimelineDataToRetrieve} for details.
* @return A set of <cite>TimelineEntity</cite> instances of the given entity
* type in the given context scope which matches the given predicates
* ordered by created time, descending. Each entity will only contain the
* metadata(id, type and created time) plus the given fields to retrieve.
* ordered by enitityIdPrefix(for generic entities only).
* Each entity will only contain
* the metadata(id, type , idPrefix and created time) plus the given
* fields to retrieve.
* <br>
* If entityType is YARN_FLOW_ACTIVITY, entities returned are of type
* <cite>FlowActivityEntity</cite>.<br>

View File

@ -33,13 +33,13 @@ public class EntityRowKey {
private final Long flowRunId;
private final String appId;
private final String entityType;
private final long entityIdPrefix;
private final Long entityIdPrefix;
private final String entityId;
private final KeyConverter<EntityRowKey> entityRowKeyConverter =
new EntityRowKeyConverter();
public EntityRowKey(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType, long entityIdPrefix,
Long flowRunId, String appId, String entityType, Long entityIdPrefix,
String entityId) {
this.clusterId = clusterId;
this.userId = userId;
@ -79,7 +79,7 @@ public class EntityRowKey {
return entityId;
}
public long getEntityIdPrefix() {
public Long getEntityIdPrefix() {
return entityIdPrefix;
}
@ -180,14 +180,24 @@ public class EntityRowKey {
Separator.encode(rowKey.getEntityType(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);
if (rowKey.getEntityIdPrefix() == null) {
return Separator.QUALIFIERS.join(first, second, third, entityType,
Separator.EMPTY_BYTES);
}
byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix());
byte[] entityId =
rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator
.encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS);
if (rowKey.getEntityId() == null) {
return Separator.QUALIFIERS.join(first, second, third, entityType,
enitityIdPrefix, Separator.EMPTY_BYTES);
}
byte[] entityId = Separator.encode(rowKey.getEntityId(), Separator.SPACE,
Separator.TAB, Separator.QUALIFIERS);
byte[] fourth =
Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId);
return Separator.QUALIFIERS.join(first, second, third, fourth);
}
@ -227,7 +237,7 @@ public class EntityRowKey {
Separator.decode(Bytes.toString(rowKeyComponents[5]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
long entityPrefixId = Bytes.toLong(rowKeyComponents[6]);
Long entityPrefixId = Bytes.toLong(rowKeyComponents[6]);
String entityId =
Separator.decode(Bytes.toString(rowKeyComponents[7]),

View File

@ -31,18 +31,20 @@ public class EntityRowKeyPrefix extends EntityRowKey implements
* Creates a prefix which generates the following rowKeyPrefixes for the
* entity table:
* {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}.
*
* @param clusterId identifying the cluster
* @param userId identifying the user
* @param flowName identifying the flow
* @param flowRunId identifying the individual run of this flow
* @param appId identifying the application
* @param entityType which entity type
* @param entityIdPrefix for entityId
* @param entityId for an entity
*/
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
Long flowRunId, String appId, String entityType) {
// TODO YARN-5585, change prefix id from 0L
super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, null);
Long flowRunId, String appId, String entityType, Long entityIdPrefix,
String entityId) {
super(clusterId, userId, flowName, flowRunId, appId, entityType,
entityIdPrefix, entityId);
}
/**
@ -58,8 +60,7 @@ public class EntityRowKeyPrefix extends EntityRowKey implements
*/
public EntityRowKeyPrefix(String clusterId, String userId, String flowName,
Long flowRunId, String appId) {
// TODO YARN-5585, change prefix id from 0L
super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null);
this(clusterId, userId, flowName, flowRunId, appId, null, null, null);
}
/*

View File

@ -64,7 +64,7 @@ class ApplicationEntityReader extends GenericEntityReader {
public ApplicationEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt, entityFilters, toRetrieve, true);
super(ctxt, entityFilters, toRetrieve);
}
public ApplicationEntityReader(TimelineReaderContext ctxt,

View File

@ -60,7 +60,7 @@ class FlowActivityEntityReader extends TimelineEntityReader {
public FlowActivityEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt, entityFilters, toRetrieve, true);
super(ctxt, entityFilters, toRetrieve);
}
public FlowActivityEntityReader(TimelineReaderContext ctxt,

View File

@ -63,7 +63,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
public FlowRunEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt, entityFilters, toRetrieve, true);
super(ctxt, entityFilters, toRetrieve);
}
public FlowRunEntityReader(TimelineReaderContext ctxt,

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
@ -70,9 +73,8 @@ class GenericEntityReader extends TimelineEntityReader {
new StringKeyConverter();
public GenericEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
boolean sortedKeys) {
super(ctxt, entityFilters, toRetrieve, sortedKeys);
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt, entityFilters, toRetrieve);
}
public GenericEntityReader(TimelineReaderContext ctxt,
@ -424,18 +426,44 @@ class GenericEntityReader extends TimelineEntityReader {
protected Result getResult(Configuration hbaseConf, Connection conn,
FilterList filterList) throws IOException {
TimelineReaderContext context = getContext();
byte[] rowKey =
new EntityRowKey(context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowRunId(), context.getAppId(),
// TODO YARN-5585, change prefix id from 0L
context.getEntityType(), 0L, context.getEntityId()).getRowKey();
Result result = null;
if (context.getEntityIdPrefix() != null) {
byte[] rowKey = new EntityRowKey(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(),
context.getEntityIdPrefix(), context.getEntityId()).getRowKey();
Get get = new Get(rowKey);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList);
Get get = new Get(rowKey);
get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) {
get.setFilter(filterList);
}
result = getTable().getResult(hbaseConf, conn, get);
} else {
// Prepare for range scan
// create single SingleColumnValueFilter and add to existing filters.
FilterList filter = new FilterList(Operator.MUST_PASS_ALL);
if (filterList != null && !filterList.getFilters().isEmpty()) {
filter.addFilter(filterList);
}
FilterList newFilter = new FilterList();
newFilter.addFilter(TimelineFilterUtils.createHBaseSingleColValueFilter(
EntityColumn.ID, context.getEntityId(), CompareOp.EQUAL));
newFilter.addFilter(new PageFilter(1));
filter.addFilter(newFilter);
ResultScanner results = getResults(hbaseConf, conn, filter);
try {
Iterator<Result> iterator = results.iterator();
if (iterator.hasNext()) {
result = iterator.next();
}
} finally {
results.close();
}
}
return getTable().getResult(hbaseConf, conn, get);
return result;
}
@Override
@ -445,11 +473,36 @@ class GenericEntityReader extends TimelineEntityReader {
// and one type
Scan scan = new Scan();
TimelineReaderContext context = getContext();
RowKeyPrefix<EntityRowKey> entityRowKeyPrefix =
new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(),
context.getFlowName(), context.getFlowRunId(), context.getAppId(),
context.getEntityType());
scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
RowKeyPrefix<EntityRowKey> entityRowKeyPrefix = null;
// default mode, will always scans from beginning of entity type.
if (getFilters() == null || getFilters().getFromIdPrefix() == null) {
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(), null, null);
scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
} else { // pagination mode, will scan from given entityIdPrefix!enitityId
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(),
getFilters().getFromIdPrefix(), getFilters().getFromId());
// set start row
scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix());
// get the bytes for stop row
entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId(), context.getEntityType(), null, null);
// set stop row
scan.setStopRow(
HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
entityRowKeyPrefix.getRowKeyPrefix()));
// set page filter to limit. This filter has to set only in pagination
// mode.
filterList.addFilter(new PageFilter(getFilters().getLimit()));
}
scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
if (filterList != null && !filterList.getFilters().isEmpty()) {
scan.setFilter(filterList);
@ -463,10 +516,10 @@ class GenericEntityReader extends TimelineEntityReader {
return null;
}
TimelineEntity entity = new TimelineEntity();
String entityType = EntityColumn.TYPE.readResult(result).toString();
entity.setType(entityType);
String entityId = EntityColumn.ID.readResult(result).toString();
entity.setId(entityId);
EntityRowKey parseRowKey = EntityRowKey.parseRowKey(result.getRow());
entity.setType(parseRowKey.getEntityType());
entity.setId(parseRowKey.getEntityId());
entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue());
TimelineEntityFilters filters = getFilters();
// fetch created time

View File

@ -21,11 +21,10 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -74,14 +73,6 @@ public abstract class TimelineEntityReader extends
*/
private BaseTable<?> table;
/**
* Specifies whether keys for this table are sorted in a manner where entities
* can be retrieved by created time. If true, it will be sufficient to collect
* the first results as specified by the limit. Otherwise all matched entities
* will be fetched and then limit applied.
*/
private boolean sortedKeys = false;
/**
* Used to convert strings key components to and from storage format.
*/
@ -95,15 +86,11 @@ public abstract class TimelineEntityReader extends
* made.
* @param entityFilters Filters which limit the entities returned.
* @param toRetrieve Data to retrieve for each entity.
* @param sortedKeys Specifies whether key for this table are sorted or not.
* If sorted, entities can be retrieved by created time.
*/
protected TimelineEntityReader(TimelineReaderContext ctxt,
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
boolean sortedKeys) {
TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
super(ctxt);
this.singleEntityRead = false;
this.sortedKeys = sortedKeys;
this.dataToRetrieve = toRetrieve;
this.filters = entityFilters;
@ -245,7 +232,7 @@ public abstract class TimelineEntityReader extends
validateParams();
augmentParams(hbaseConf, conn);
NavigableSet<TimelineEntity> entities = new TreeSet<>();
Set<TimelineEntity> entities = new LinkedHashSet<>();
FilterList filterList = createFilterList();
if (LOG.isDebugEnabled() && filterList != null) {
LOG.debug("FilterList created for scan is - " + filterList);
@ -258,14 +245,8 @@ public abstract class TimelineEntityReader extends
continue;
}
entities.add(entity);
if (!sortedKeys) {
if (entities.size() > filters.getLimit()) {
entities.pollLast();
}
} else {
if (entities.size() == filters.getLimit()) {
break;
}
if (entities.size() == filters.getLimit()) {
break;
}
}
return entities;

View File

@ -83,7 +83,7 @@ public final class TimelineEntityReaderFactory {
return new FlowRunEntityReader(context, filters, dataToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(context, filters, dataToRetrieve, false);
return new GenericEntityReader(context, filters, dataToRetrieve);
}
}

View File

@ -53,19 +53,19 @@ public class TestTimelineUIDConverter {
assertEquals(context, TimelineUIDConverter.APPLICATION_UID.decodeUID(uid));
context = new TimelineReaderContext("yarn_cluster", "root", "hive_join",
1234L, "application_1111111111_1111", "YARN_CONTAINER",
1234L, "application_1111111111_1111", "YARN_CONTAINER", 12345L,
"container_1111111111_1111_01_000001");
uid = TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context);
assertEquals("yarn_cluster!root!hive_join!1234!application_1111111111_1111!"
+ "YARN_CONTAINER!container_1111111111_1111_01_000001", uid);
+ "YARN_CONTAINER!12345!container_1111111111_1111_01_000001", uid);
assertEquals(
context, TimelineUIDConverter.GENERIC_ENTITY_UID.decodeUID(uid));
context = new TimelineReaderContext("yarn_cluster", null, null, null,
"application_1111111111_1111", "YARN_CONTAINER",
"application_1111111111_1111", "YARN_CONTAINER", 54321L,
"container_1111111111_1111_01_000001");
uid = TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context);
assertEquals("yarn_cluster!application_1111111111_1111!YARN_CONTAINER!" +
"container_1111111111_1111_01_000001", uid);
"54321!container_1111111111_1111_01_000001", uid);
assertEquals(
context, TimelineUIDConverter.GENERIC_ENTITY_UID.decodeUID(uid));
}

View File

@ -139,6 +139,7 @@ public class TestRowKeys {
TimelineEntity entity = new TimelineEntity();
entity.setId("!ent!ity!!id!");
entity.setType("entity!Type");
entity.setIdPrefix(54321);
byte[] byteRowKey =
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
@ -151,11 +152,13 @@ public class TestRowKeys {
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
assertEquals(entity.getType(), rowKey.getEntityType());
assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue());
assertEquals(entity.getId(), rowKey.getEntityId());
byte[] byteRowKeyPrefix =
new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID, entity.getType()).getRowKeyPrefix();
APPLICATION_ID, entity.getType(), null, null)
.getRowKeyPrefix();
byte[][] splits =
Separator.QUALIFIERS.split(
byteRowKeyPrefix,
@ -163,8 +166,7 @@ public class TestRowKeys {
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE });
assertEquals(8, splits.length);
assertEquals(entity.getIdPrefix(), splits[7].length);
assertEquals(7, splits.length);
assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
assertEquals(entity.getType(),
Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));