From 1e5e4a52f7a93d6713d72070d2bde875d0256d10 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Sat, 7 Jan 2017 01:38:36 +0530 Subject: [PATCH] YARN-5585. [Atsv2] Reader side changes for entity prefix and support for pagination via additional filters (Rohith Sharma K S via Varun Saxena) (cherry picked from commit 9a2f288a6a9e30376b7fd99bf8824184a34a54c9) --- .../timelineservice/TimelineEntity.java | 16 +- ...TimelineReaderWebServicesHBaseStorage.java | 102 +++++++- .../reader/TimelineEntityFilters.java | 53 +++- .../reader/TimelineReaderContext.java | 20 +- .../reader/TimelineReaderManager.java | 1 + .../reader/TimelineReaderWebServices.java | 230 ++++++++++++++---- .../TimelineReaderWebServicesUtils.java | 13 +- .../reader/TimelineUIDConverter.java | 19 +- .../reader/filter/TimelineFilterUtils.java | 17 ++ .../storage/TimelineReader.java | 10 +- .../storage/entity/EntityRowKey.java | 26 +- .../storage/entity/EntityRowKeyPrefix.java | 13 +- .../reader/ApplicationEntityReader.java | 2 +- .../reader/FlowActivityEntityReader.java | 2 +- .../storage/reader/FlowRunEntityReader.java | 2 +- .../storage/reader/GenericEntityReader.java | 97 ++++++-- .../storage/reader/TimelineEntityReader.java | 29 +-- .../reader/TimelineEntityReaderFactory.java | 2 +- .../reader/TestTimelineUIDConverter.java | 8 +- .../storage/common/TestRowKeys.java | 8 +- 20 files changed, 511 insertions(+), 159 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index 7a289b96d37..845e2cc547b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -549,20 +549,10 @@ public class TimelineEntity implements Comparable { public int compareTo(TimelineEntity other) { int comparison = getType().compareTo(other.getType()); if (comparison == 0) { - if (getCreatedTime() == null) { - if (other.getCreatedTime() == null) { - return getId().compareTo(other.getId()); - } else { - return 1; - } - } - if (other.getCreatedTime() == null) { + if (getIdPrefix() > other.getIdPrefix()) { + // Descending order by entity id prefix return -1; - } - if (getCreatedTime() > other.getCreatedTime()) { - // Order by created time desc - return -1; - } else if (getCreatedTime() < other.getCreatedTime()) { + } else if (getIdPrefix() < other.getIdPrefix()) { return 1; } else { return getId().compareTo(other.getId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index a83d2dc330a..fa35fc5a50e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -214,7 +214,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { entity4.addMetrics(metrics); te4.addEntity(entity4); - TimelineEntities te5 = new TimelineEntities(); + TimelineEntities userEntities = new TimelineEntities(); TimelineEntity entity5 = new TimelineEntity(); entity5.setId("entity1"); entity5.setType("type1"); @@ -270,7 +270,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { relatesTo1.put("type3", Sets.newHashSet("entity31", "entity35", "entity32", "entity33")); entity5.addRelatesToEntities(relatesTo1); - te5.addEntity(entity5); + userEntities.addEntity(entity5); TimelineEntity entity6 = new TimelineEntity(); entity6.setId("entity2"); @@ -329,7 +329,16 @@ public class TestTimelineReaderWebServicesHBaseStorage { relatesTo2.put("type6", Sets.newHashSet("entity61", "entity66")); relatesTo2.put("type3", Sets.newHashSet("entity31")); entity6.addRelatesToEntities(relatesTo2); - te5.addEntity(entity6); + userEntities.addEntity(entity6); + + for (long i = 1; i <= 10; i++) { + TimelineEntity userEntity = new TimelineEntity(); + userEntity.setType("entitytype"); + userEntity.setId("entityid-" + i); + userEntity.setIdPrefix(11 - i); + userEntity.setCreatedTime(System.currentTimeMillis()); + userEntities.addEntity(userEntity); + } HBaseTimelineWriterImpl hbi = null; Configuration c1 = util.getConfiguration(); @@ -342,7 +351,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { hbi.write(cluster, user, flow2, flowVersion2, runid2, entity3.getId(), te3); hbi.write(cluster, user, flow, flowVersion, runid, - "application_1111111111_1111", te5); + "application_1111111111_1111", userEntities); hbi.flush(); } finally { if (hbi != null) { @@ -784,7 +793,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals(TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID( new TimelineReaderContext(context.getClusterId(), context.getUserId(), context.getFlowName(), - context.getFlowRunId(), context.getAppId(), "type1", + context.getFlowRunId(), context.getAppId(), "type1", + entity.getIdPrefix(), entity.getId())), entityUID); } } @@ -860,8 +870,8 @@ public class TestTimelineReaderWebServicesHBaseStorage { String uid = (String) entity.getInfo().get(TimelineReaderManager.UID_KEY); assertNotNull(uid); - assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!entity1") || - uid.equals(appUIDWithFlowInfo + "!type1!entity2")); + assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!0!entity1") + || uid.equals(appUIDWithFlowInfo + "!type1!0!entity2")); } String appUIDWithoutFlowInfo = "cluster1!application_1111111111_1111"; @@ -887,11 +897,11 @@ public class TestTimelineReaderWebServicesHBaseStorage { String uid = (String) entity.getInfo().get(TimelineReaderManager.UID_KEY); assertNotNull(uid); - assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!entity1") || - uid.equals(appUIDWithoutFlowInfo + "!type1!entity2")); + assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!0!entity1") + || uid.equals(appUIDWithoutFlowInfo + "!type1!0!entity2")); } - String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!entity1"; + String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!0!entity1"; uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+ "entity-uid/" + entityUIDWithFlowInfo); resp = getResponse(client, uri); @@ -901,7 +911,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertEquals("entity1", singleEntity1.getId()); String entityUIDWithoutFlowInfo = - appUIDWithoutFlowInfo + "!type1!entity1"; + appUIDWithoutFlowInfo + "!type1!0!entity1"; uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+ "entity-uid/" + entityUIDWithoutFlowInfo); resp = getResponse(client, uri); @@ -2162,4 +2172,74 @@ public class TestTimelineReaderWebServicesHBaseStorage { server = null; } } + + @Test + public void testGenericEntitiesForPagination() throws Exception { + Client client = createClient(); + try { + int limit = 10; + String queryParam = "?limit=" + limit; + String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/apps/application_1111111111_1111/" + + "entities/entitytype"; + URI uri = URI.create(resourceUri + queryParam); + + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + // verify for entity-10 to entity-1 in descending order. + verifyPaginatedEntites(entities, limit, limit); + + limit = 4; + queryParam = "?limit=" + limit; + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + // verify for entity-10 to entity-7 in descending order. + TimelineEntity entity = verifyPaginatedEntites(entities, limit, 10); + + queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix() + + "&&fromid=" + entity.getId(); + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + // verify for entity-7 to entity-4 in descending order. + entity = verifyPaginatedEntites(entities, limit, 7); + + queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix(); + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + // verify for entity-4 to entity-1 in descending order. + entity = verifyPaginatedEntites(entities, limit, 4); + + queryParam = "?limit=" + limit + "&&fromidprefix=" + entity.getIdPrefix(); + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + // always entity-1 will be retrieved + entity = verifyPaginatedEntites(entities, 1, 1); + } finally { + client.destroy(); + } + } + + private TimelineEntity verifyPaginatedEntites(List entities, + int limit, int startFrom) { + assertNotNull(entities); + assertEquals(limit, entities.size()); + TimelineEntity entity = null; + for (TimelineEntity timelineEntity : entities) { + assertEquals("entitytype", timelineEntity.getType()); + assertEquals("entityid-" + startFrom, timelineEntity.getId()); + assertEquals(11 - startFrom--, timelineEntity.getIdPrefix()); + entity = timelineEntity; + } + return entity; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java index 8f2b7252146..79a83c6ffc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java @@ -35,10 +35,10 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyVa *
  • limit - A limit on the number of entities to return. If null or * {@literal < 0}, defaults to {@link #DEFAULT_LIMIT}. The maximum possible * value for limit can be {@link Long#MAX_VALUE}.
  • - *
  • createdTimeBegin - Matched entities should not be created - * before this timestamp. If null or {@literal <=0}, defaults to 0.
  • - *
  • createdTimeEnd - Matched entities should not be created after - * this timestamp. If null or {@literal <=0}, defaults to + *
  • createdTimeBegin - Matched entities should not be created before + * this timestamp. If null or {@literal <=0}, defaults to 0.
  • + *
  • createdTimeEnd - Matched entities should not be created after this + * timestamp. If null or {@literal <=0}, defaults to * {@link Long#MAX_VALUE}.
  • *
  • relatesTo - Matched entities should or should not relate to given * entities depending on what's specified in the filter. The entities in @@ -99,6 +99,19 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyVa * filter list, event filters can be evaluated with logical AND/OR and we can * create a hierarchy of these {@link TimelineExistsFilter} objects. If null or * empty, the filter is not applied.
  • + *
  • fromIdPrefix - If specified, retrieve entities with an id prefix + * greater than or equal to the specified fromIdPrefix. If fromIdPrefix is same + * for all entities of a given entity type, then the user must provide fromId as + * a filter to denote the start entity from which further entities will be + * fetched. fromIdPrefix is mandatory even in the case the entity id prefix is + * not used and should be set to 0.
  • + *
  • fromId - If specified along with fromIdPrefix, retrieve entities + * with an id prefix greater than or equal to specified id prefix in + * fromIdPrefix and entity id lexicographically greater than or equal to entity + * id specified in fromId. Please note than fromIdPrefix is mandatory if fromId + * is specified, otherwise, the filter will be ignored. It is recommended to + * provide both fromIdPrefix and fromId filters for more accurate results as id + * prefix may not be unique for an entity.
  • * */ @Private @@ -113,9 +126,12 @@ public class TimelineEntityFilters { private TimelineFilterList configFilters; private TimelineFilterList metricFilters; private TimelineFilterList eventFilters; + private Long fromIdPrefix; + private String fromId; private static final long DEFAULT_BEGIN_TIME = 0L; private static final long DEFAULT_END_TIME = Long.MAX_VALUE; + /** * Default limit of number of entities to return for getEntities API. */ @@ -125,6 +141,19 @@ public class TimelineEntityFilters { this(null, null, null, null, null, null, null, null, null); } + public TimelineEntityFilters(Long entityLimit, Long timeBegin, Long timeEnd, + TimelineFilterList entityRelatesTo, TimelineFilterList entityIsRelatedTo, + TimelineFilterList entityInfoFilters, + TimelineFilterList entityConfigFilters, + TimelineFilterList entityMetricFilters, + TimelineFilterList entityEventFilters, Long fromidprefix, String fromid) { + this(entityLimit, timeBegin, timeEnd, entityRelatesTo, entityIsRelatedTo, + entityInfoFilters, entityConfigFilters, entityMetricFilters, + entityEventFilters); + this.fromIdPrefix = fromidprefix; + this.fromId = fromid; + } + public TimelineEntityFilters( Long entityLimit, Long timeBegin, Long timeEnd, TimelineFilterList entityRelatesTo, @@ -239,4 +268,20 @@ public class TimelineEntityFilters { public void setEventFilters(TimelineFilterList filters) { this.eventFilters = filters; } + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public Long getFromIdPrefix() { + return fromIdPrefix; + } + + public void setFromIdPrefix(Long fromIdPrefix) { + this.fromIdPrefix = fromIdPrefix; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java index 633bb232e4f..5f308cbcb12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java @@ -31,6 +31,7 @@ public class TimelineReaderContext extends TimelineContext { private String entityType; private String entityId; + private Long entityIdPrefix; public TimelineReaderContext(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType, String entityId) { super(clusterId, userId, flowName, flowRunId, appId); @@ -38,16 +39,25 @@ public class TimelineReaderContext extends TimelineContext { this.entityId = entityId; } + public TimelineReaderContext(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { + this(clusterId, userId, flowName, flowRunId, appId, entityType, entityId); + this.entityIdPrefix = entityIdPrefix; + } + public TimelineReaderContext(TimelineReaderContext other) { this(other.getClusterId(), other.getUserId(), other.getFlowName(), other.getFlowRunId(), other.getAppId(), other.getEntityType(), - other.getEntityId()); + other.getEntityIdPrefix(), other.getEntityId()); } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); + result = prime * result + + ((entityIdPrefix == null) ? 0 : entityIdPrefix.hashCode()); result = prime * result + ((entityId == null) ? 0 : entityId.hashCode()); result = prime * result + ((entityType == null) ? 0 : entityType.hashCode()); @@ -95,4 +105,12 @@ public class TimelineReaderContext extends TimelineContext { public void setEntityId(String id) { this.entityId = id; } + + public Long getEntityIdPrefix() { + return entityIdPrefix; + } + + public void setEntityIdPrefix(Long entityIdPrefix) { + this.entityIdPrefix = entityIdPrefix; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java index 6e8b823bdd9..66e4cbfa548 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -113,6 +113,7 @@ public class TimelineReaderManager extends AbstractService { } } context.setEntityType(entity.getType()); + context.setEntityIdPrefix(entity.getIdPrefix()); context.setEntityId(entity.getId()); entity.setUID(UID_KEY, TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index bffb01e681d..994c27647c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -264,6 +264,20 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromIdPrefix If specified, retrieve entities with an id prefix + * greater than or equal to the specified fromIdPrefix. If fromIdPrefix + * is same for all entities of a given entity type, then the user must + * provide fromId as a filter to denote the start entity from which + * further entities will be fetched. fromIdPrefix is mandatory even + * in the case the entity id prefix is not used and should be set to 0. + * @param fromId If specified along with fromIdPrefix, retrieve entities with + * an id prefix greater than or equal to specified id prefix in + * fromIdPrefix and entity id lexicographically greater than or equal + * to entity id specified in fromId. Please note than fromIdPrefix is + * mandatory if fromId is specified, otherwise, the filter will be + * ignored. It is recommended to provide both fromIdPrefix and fromId + * filters for more accurate results as id prefix may not be unique + * for an entity. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances of the given entity type @@ -294,7 +308,9 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -317,7 +333,8 @@ public class TimelineReaderWebServices { entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, - infofilters, conffilters, metricfilters, eventfilters), + infofilters, conffilters, metricfilters, eventfilters, + fromIdPrefix, fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -400,6 +417,20 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromIdPrefix If specified, retrieve entities with an id prefix + * greater than or equal to the specified fromIdPrefix. If fromIdPrefix + * is same for all entities of a given entity type, then the user must + * provide fromId as a filter to denote the start entity from which + * further entities will be fetched. fromIdPrefix is mandatory even + * in the case the entity id prefix is not used and should be set to 0. + * @param fromId If specified along with fromIdPrefix, retrieve entities with + * an id prefix greater than or equal to specified id prefix in + * fromIdPrefix and entity id lexicographically greater than or equal + * to entity id specified in fromId. Please note than fromIdPrefix is + * mandatory if fromId is specified, otherwise, the filter will be + * ignored. It is recommended to provide both fromIdPrefix and fromId + * filters for more accurate results as id prefix may not be unique + * for an entity. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances of the given entity type @@ -435,11 +466,14 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { return getEntities(req, res, null, appId, entityType, userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix, + fromId); } /** @@ -510,6 +544,20 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromIdPrefix If specified, retrieve entities with an id prefix + * greater than or equal to the specified fromIdPrefix. If fromIdPrefix + * is same for all entities of a given entity type, then the user must + * provide fromId as a filter to denote the start entity from which + * further entities will be fetched. fromIdPrefix is mandatory even + * in the case the entity id prefix is not used and should be set to 0. + * @param fromId If specified along with fromIdPrefix, retrieve entities with + * an id prefix greater than or equal to specified id prefix in + * fromIdPrefix and entity id lexicographically greater than or equal + * to entity id specified in fromId. Please note than fromIdPrefix is + * mandatory if fromId is specified, otherwise, the filter will be + * ignored. It is recommended to provide both fromIdPrefix and fromId + * filters for more accurate results as id prefix may not be unique + * for an entity. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances of the given entity type @@ -546,7 +594,9 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -559,12 +609,14 @@ public class TimelineReaderWebServices { TimelineReaderManager timelineReaderManager = getTimelineReaderManager(); Set entities = null; try { - entities = timelineReaderManager.getEntities( - TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, userId, flowName, flowRunId, appId, entityType, null), + TimelineReaderContext context = TimelineReaderWebServicesUtils + .createTimelineReaderContext(clusterId, userId, flowName, flowRunId, + appId, entityType, null, null); + entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, - infofilters, conffilters, metricfilters, eventfilters), + infofilters, conffilters, metricfilters, eventfilters, + fromIdPrefix, fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -703,6 +755,8 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param entityIdPrefix Defines the id prefix for the entity to be fetched. + * If specified, then entity retrieval will be faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
    @@ -729,10 +783,11 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getEntity(req, res, null, appId, entityType, entityId, userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, - metricsLimit); + metricsLimit, entityIdPrefix); } /** @@ -774,6 +829,8 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param entityIdPrefix Defines the id prefix for the entity to be fetched. + * If specified, then entity retrieval will be faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
    @@ -801,7 +858,8 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -816,7 +874,8 @@ public class TimelineReaderWebServices { try { entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, userId, flowName, flowRunId, appId, entityType, entityId), + clusterId, userId, flowName, flowRunId, appId, entityType, + entityIdPrefix, entityId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -992,7 +1051,7 @@ public class TimelineReaderWebServices { entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, null, null)); } catch (Exception e) { @@ -1081,7 +1140,7 @@ public class TimelineReaderWebServices { entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, - null, null, null), + null, null, null, null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, fields, null)); } catch (Exception e) { @@ -1217,10 +1276,10 @@ public class TimelineReaderWebServices { entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, null, null, - TimelineEntityType.YARN_FLOW_RUN.toString(), null), + TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, - null, null, null), + null, null, null, null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, fields, null)); } catch (Exception e) { @@ -1339,13 +1398,14 @@ public class TimelineReaderWebServices { DateRange range = parseDateRange(dateRange); TimelineEntityFilters entityFilters = TimelineReaderWebServicesUtils.createTimelineEntityFilters( - limit, null, null, null, null, null, null, null, null); + limit, null, null, null, null, null, null, null, null, null, + null); entityFilters.setCreatedTimeBegin(range.dateStart); entityFilters.setCreatedTimeEnd(range.dateEnd); entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, null, null, null, null, - TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null), entityFilters, TimelineReaderWebServicesUtils. createTimelineDataToRetrieve(null, null, null, null)); } catch (Exception e) { @@ -1584,7 +1644,7 @@ public class TimelineReaderWebServices { entity = timelineReaderManager.getEntity( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, appId, - TimelineEntityType.YARN_APPLICATION.toString(), null), + TimelineEntityType.YARN_APPLICATION.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -1710,7 +1770,8 @@ public class TimelineReaderWebServices { entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, - infofilters, conffilters, metricfilters, eventfilters), + infofilters, conffilters, metricfilters, eventfilters, null, + null), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -1822,7 +1883,7 @@ public class TimelineReaderWebServices { TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); } /** @@ -1924,7 +1985,7 @@ public class TimelineReaderWebServices { TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); } /** @@ -2020,7 +2081,7 @@ public class TimelineReaderWebServices { TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); } /** @@ -2118,7 +2179,7 @@ public class TimelineReaderWebServices { TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); } /** @@ -2189,6 +2250,21 @@ public class TimelineReaderWebServices { * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param fromIdPrefix If specified, retrieve entities with an id prefix + * greater than or equal to the specified fromIdPrefix. If + * fromIdPrefix is same for all entities of a given entity type, then + * the user must provide fromId as a filter to denote the start + * entity from which further entities will be fetched. fromIdPrefix + * is mandatory even in the case the entity id prefix is not used and + * should be set to 0. + * @param fromId If specified along with fromIdPrefix, retrieve entities with + * an id prefix greater than or equal to specified id prefix in + * fromIdPrefix and entity id lexicographically greater than or equal + * to entity id specified in fromId. Please note than fromIdPrefix is + * mandatory if fromId is specified, otherwise, the filter will be + * ignored. It is recommended to provide both fromIdPrefix and fromId + * filters for more accurate results as id prefix may not be unique + * for an entity. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of TimelineEntity instances of the app-attempt @@ -2221,12 +2297,14 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { return getAppAttempts(req, res, null, appId, userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, confsToRetrieve, - metricsToRetrieve, fields, metricsLimit); + metricsToRetrieve, fields, metricsLimit, fromIdPrefix, fromId); } /** @@ -2298,6 +2376,21 @@ public class TimelineReaderWebServices { * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param fromIdPrefix If specified, retrieve entities with an id prefix + * greater than or equal to the specified fromIdPrefix. If + * fromIdPrefix is same for all entities of a given entity type, then + * the user must provide fromId as a filter to denote the start + * entity from which further entities will be fetched. fromIdPrefix + * is mandatory even in the case the entity id prefix is not used and + * should be set to 0. + * @param fromId If specified along with fromIdPrefix, retrieve entities with + * an id prefix greater than or equal to specified id prefix in + * fromIdPrefix and entity id lexicographically greater than or equal + * to entity id specified in fromId. Please note than fromIdPrefix is + * mandatory if fromId is specified, otherwise, the filter will be + * ignored. It is recommended to provide both fromIdPrefix and fromId + * filters for more accurate results as id prefix may not be unique + * for an entity. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of TimelineEntity instances of the app-attempts @@ -2331,13 +2424,16 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { return getEntities(req, res, clusterId, appId, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, + fromIdPrefix, fromId); } /** @@ -2380,6 +2476,8 @@ public class TimelineReaderWebServices { * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param entityIdPrefix Defines the id prefix for the entity to be fetched. + * If specified, then entity retrieval will be faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
    @@ -2404,9 +2502,11 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getAppAttempt(req, res, null, appId, appAttemptId, userId, flowName, - flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit, + entityIdPrefix); } /** @@ -2449,6 +2549,8 @@ public class TimelineReaderWebServices { * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param entityIdPrefix Defines the id prefix for the entity to be fetched. + * If specified, then entity retrieval will be faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
    @@ -2475,11 +2577,12 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getEntity(req, res, clusterId, appId, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptId, userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, - metricsLimit); + metricsLimit, entityIdPrefix); } /** @@ -2552,6 +2655,21 @@ public class TimelineReaderWebServices { * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param fromIdPrefix If specified, retrieve entities with an id prefix + * greater than or equal to the specified fromIdPrefix. If + * fromIdPrefix is same for all entities of a given entity type, then + * the user must provide fromId as a filter to denote the start + * entity from which further entities will be fetched. fromIdPrefix + * is mandatory even in the case the entity id prefix is not used and + * should be set to 0. + * @param fromId If specified along with fromIdPrefix, retrieve entities with + * an id prefix greater than or equal to specified id prefix in + * fromIdPrefix and entity id lexicographically greater than or equal + * to entity id specified in fromId. Please note than fromIdPrefix is + * mandatory if fromId is specified, otherwise, the filter will be + * ignored. It is recommended to provide both fromIdPrefix and fromId + * filters for more accurate results as id prefix may not be unique + * for an entity. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of TimelineEntity instances of the containers @@ -2585,11 +2703,14 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { return getContainers(req, res, null, appId, appattemptId, userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix, + fromId); } /** @@ -2663,6 +2784,21 @@ public class TimelineReaderWebServices { * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param fromIdPrefix If specified, retrieve entities with an id prefix + * greater than or equal to the specified fromIdPrefix. If + * fromIdPrefix is same for all entities of a given entity type, then + * the user must provide fromId as a filter to denote the start + * entity from which further entities will be fetched. fromIdPrefix + * is mandatory even in the case the entity id prefix is not used and + * should be set to 0. + * @param fromId If specified along with fromIdPrefix, retrieve entities with + * an id prefix greater than or equal to specified id prefix in + * fromIdPrefix and entity id lexicographically greater than or equal + * to entity id specified in fromId. Please note than fromIdPrefix is + * mandatory if fromId is specified, otherwise, the filter will be + * ignored. It is recommended to provide both fromIdPrefix and fromId + * filters for more accurate results as id prefix may not be unique + * for an entity. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of TimelineEntity instances of the containers @@ -2698,7 +2834,9 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromidprefix") String fromIdPrefix, + @QueryParam("fromid") String fromId) { String entityType = TimelineEntityType.YARN_CONTAINER.toString(); String parentEntityType = @@ -2716,7 +2854,8 @@ public class TimelineReaderWebServices { return getEntities(req, res, clusterId, appId, entityType, userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilter, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, fromIdPrefix, + fromId); } /** @@ -2758,6 +2897,8 @@ public class TimelineReaderWebServices { * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param entityIdPrefix Defines the id prefix for the entity to be fetched. + * If specified, then entity retrieval will be faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing * TimelineEntity instance is returned.
    @@ -2782,9 +2923,11 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getContainer(req, res, null, appId, containerId, userId, flowName, - flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit); + flowRunId, confsToRetrieve, metricsToRetrieve, fields, metricsLimit, + entityIdPrefix); } /** @@ -2827,6 +2970,8 @@ public class TimelineReaderWebServices { * have to be retrieved, then metricsLimit will be considered as 1 * i.e. latest single value of metric(s) will be returned. (Optional * query param). + * @param entityIdPrefix Defines the id prefix for the entity to be fetched. + * If specified, then entity retrieval will be faster. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * TimelineEntity instance is returned.
    @@ -2853,11 +2998,12 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("entityidprefix") String entityIdPrefix) { return getEntity(req, res, clusterId, appId, TimelineEntityType.YARN_CONTAINER.toString(), containerId, userId, flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields, - metricsLimit); + metricsLimit, entityIdPrefix); } /** @@ -2952,7 +3098,7 @@ public class TimelineReaderWebServices { results = timelineReaderManager.getEntityTypes( TimelineReaderWebServicesUtils.createTimelineReaderContext( clusterId, userId, flowName, flowRunId, appId, - null, null)); + null, null, null)); } catch (Exception e) { handleException(e, url, startTime, "flowrunid"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java index 7fc8cb8483c..1a518d0e136 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java @@ -23,6 +23,7 @@ import java.util.EnumSet; import javax.servlet.http.HttpServletRequest; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; @@ -49,10 +50,10 @@ final class TimelineReaderWebServicesUtils { */ static TimelineReaderContext createTimelineReaderContext(String clusterId, String userId, String flowName, String flowRunId, String appId, - String entityType, String entityId) { + String entityType, String entityIdPrefix, String entityId) { return new TimelineReaderContext(parseStr(clusterId), parseStr(userId), parseStr(flowName), parseLongStr(flowRunId), parseStr(appId), - parseStr(entityType), parseStr(entityId)); + parseStr(entityType), parseLongStr(entityIdPrefix), parseStr(entityId)); } /** @@ -73,12 +74,14 @@ final class TimelineReaderWebServicesUtils { static TimelineEntityFilters createTimelineEntityFilters(String limit, String createdTimeStart, String createdTimeEnd, String relatesTo, String isRelatedTo, String infofilters, String conffilters, - String metricfilters, String eventfilters) throws TimelineParseException { + String metricfilters, String eventfilters, String fromidprefix, + String fromid) throws TimelineParseException { return new TimelineEntityFilters(parseLongStr(limit), parseLongStr(createdTimeStart), parseLongStr(createdTimeEnd), parseRelationFilters(relatesTo), parseRelationFilters(isRelatedTo), parseKVFilters(infofilters, false), parseKVFilters(conffilters, true), - parseMetricFilters(metricfilters), parseEventFilters(eventfilters)); + parseMetricFilters(metricfilters), parseEventFilters(eventfilters), + parseLongStr(fromidprefix), parseStr(fromid)); } /** @@ -207,7 +210,7 @@ final class TimelineReaderWebServicesUtils { * @return trimmed string if string is not null, null otherwise. */ static String parseStr(String str) { - return str == null ? null : str.trim(); + return StringUtils.trimToNull(str); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java index 08e5405ff52..780cfd063d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java @@ -155,12 +155,14 @@ enum TimelineUIDConverter { // Flow information exists. String[] entityTupleArr = {context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId().toString(), - context.getAppId(), context.getEntityType(), context.getEntityId()}; + context.getAppId(), context.getEntityType(), + context.getEntityIdPrefix().toString(), context.getEntityId() }; return joinAndEscapeUIDParts(entityTupleArr); } else { // Only entity and app information exists. Flow info does not exist. String[] entityTupleArr = {context.getClusterId(), context.getAppId(), - context.getEntityType(), context.getEntityId()}; + context.getEntityType(), context.getEntityIdPrefix().toString(), + context.getEntityId() }; return joinAndEscapeUIDParts(entityTupleArr); } } @@ -171,20 +173,21 @@ enum TimelineUIDConverter { return null; } List entityTupleList = splitUID(uId); - // Should have 7 parts i.e. cluster, user, flow name, flowrun id, app id, - // entity type and entity id OR should have 4 parts i.e. cluster, app id, + // Should have 8 parts i.e. cluster, user, flow name, flowrun id, app id, + // entity type and entity id OR should have 5 parts i.e. cluster, app id, // entity type and entity id. - if (entityTupleList.size() == 7) { + if (entityTupleList.size() == 8) { // Flow information exists. return new TimelineReaderContext(entityTupleList.get(0), entityTupleList.get(1), entityTupleList.get(2), Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4), - entityTupleList.get(5), entityTupleList.get(6)); - } else if (entityTupleList.size() == 4) { + entityTupleList.get(5), Long.parseLong(entityTupleList.get(6)), + entityTupleList.get(7)); + } else if (entityTupleList.size() == 5) { // Flow information does not exist. return new TimelineReaderContext(entityTupleList.get(0), null, null, null, entityTupleList.get(1), entityTupleList.get(2), - entityTupleList.get(3)); + Long.parseLong(entityTupleList.get(3)), entityTupleList.get(4)); } else { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java index cccae267d88..8e38e95c74d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -180,6 +180,23 @@ public final class TimelineFilterUtils { return list; } + /** + * Creates a HBase {@link SingleColumnValueFilter} with specified column. + * @param Describes the type of column prefix. + * @param column Column which value to be filtered. + * @param value Value to be filtered. + * @param op Compare operator + * @return a SingleColumnValue Filter + * @throws IOException if any exception. + */ + public static Filter createHBaseSingleColValueFilter(Column column, + Object value, CompareOp op) throws IOException { + Filter singleColValFilter = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(value), op, true); + return singleColValFilter; + } + /** * Creates a HBase {@link SingleColumnValueFilter}. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java index d7c1552d0aa..1e77155b007 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java @@ -125,8 +125,8 @@ public interface TimelineReader extends Service { *
  • flowRunId - Context flow run id.
  • *
  • appId - Context app id.
  • * - * Although entityId is also part of context, it has no meaning for - * getEntities.
    + * Although entityIdPrefix and entityId are also part of context, + * it has no meaning for getEntities.
    * Fields in context which are mandatory depends on entity type. Entity * type is always mandatory. In addition to entity type, below is the list * of context fields which are mandatory, based on entity type.
    @@ -161,8 +161,10 @@ public interface TimelineReader extends Service { * {@link TimelineDataToRetrieve} for details. * @return A set of TimelineEntity instances of the given entity * type in the given context scope which matches the given predicates - * ordered by created time, descending. Each entity will only contain the - * metadata(id, type and created time) plus the given fields to retrieve. + * ordered by enitityIdPrefix(for generic entities only). + * Each entity will only contain + * the metadata(id, type , idPrefix and created time) plus the given + * fields to retrieve. *
    * If entityType is YARN_FLOW_ACTIVITY, entities returned are of type * FlowActivityEntity.
    diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 10aeec446a3..a8f1d0cd99d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -33,13 +33,13 @@ public class EntityRowKey { private final Long flowRunId; private final String appId; private final String entityType; - private final long entityIdPrefix; + private final Long entityIdPrefix; private final String entityId; private final KeyConverter entityRowKeyConverter = new EntityRowKeyConverter(); public EntityRowKey(String clusterId, String userId, String flowName, - Long flowRunId, String appId, String entityType, long entityIdPrefix, + Long flowRunId, String appId, String entityType, Long entityIdPrefix, String entityId) { this.clusterId = clusterId; this.userId = userId; @@ -79,7 +79,7 @@ public class EntityRowKey { return entityId; } - public long getEntityIdPrefix() { + public Long getEntityIdPrefix() { return entityIdPrefix; } @@ -180,14 +180,24 @@ public class EntityRowKey { Separator.encode(rowKey.getEntityType(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); + if (rowKey.getEntityIdPrefix() == null) { + return Separator.QUALIFIERS.join(first, second, third, entityType, + Separator.EMPTY_BYTES); + } + byte[] enitityIdPrefix = Bytes.toBytes(rowKey.getEntityIdPrefix()); - byte[] entityId = - rowKey.getEntityId() == null ? Separator.EMPTY_BYTES : Separator - .encode(rowKey.getEntityId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS); + if (rowKey.getEntityId() == null) { + return Separator.QUALIFIERS.join(first, second, third, entityType, + enitityIdPrefix, Separator.EMPTY_BYTES); + } + + byte[] entityId = Separator.encode(rowKey.getEntityId(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] fourth = Separator.QUALIFIERS.join(entityType, enitityIdPrefix, entityId); + return Separator.QUALIFIERS.join(first, second, third, fourth); } @@ -227,7 +237,7 @@ public class EntityRowKey { Separator.decode(Bytes.toString(rowKeyComponents[5]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - long entityPrefixId = Bytes.toLong(rowKeyComponents[6]); + Long entityPrefixId = Bytes.toLong(rowKeyComponents[6]); String entityId = Separator.decode(Bytes.toString(rowKeyComponents[7]), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java index ef717c08937..47a1789bf95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyPrefix.java @@ -31,18 +31,20 @@ public class EntityRowKeyPrefix extends EntityRowKey implements * Creates a prefix which generates the following rowKeyPrefixes for the * entity table: * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}. - * * @param clusterId identifying the cluster * @param userId identifying the user * @param flowName identifying the flow * @param flowRunId identifying the individual run of this flow * @param appId identifying the application * @param entityType which entity type + * @param entityIdPrefix for entityId + * @param entityId for an entity */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, - Long flowRunId, String appId, String entityType) { - // TODO YARN-5585, change prefix id from 0L - super(clusterId, userId, flowName, flowRunId, appId, entityType, 0L, null); + Long flowRunId, String appId, String entityType, Long entityIdPrefix, + String entityId) { + super(clusterId, userId, flowName, flowRunId, appId, entityType, + entityIdPrefix, entityId); } /** @@ -58,8 +60,7 @@ public class EntityRowKeyPrefix extends EntityRowKey implements */ public EntityRowKeyPrefix(String clusterId, String userId, String flowName, Long flowRunId, String appId) { - // TODO YARN-5585, change prefix id from 0L - super(clusterId, userId, flowName, flowRunId, appId, null, 0L, null); + this(clusterId, userId, flowName, flowRunId, appId, null, null, null); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 42a6aa88a8d..1667f614391 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -64,7 +64,7 @@ class ApplicationEntityReader extends GenericEntityReader { public ApplicationEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public ApplicationEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index 9ba5e38e37a..c741d0ed4b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -60,7 +60,7 @@ class FlowActivityEntityReader extends TimelineEntityReader { public FlowActivityEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public FlowActivityEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 986a28f43ac..9b8482c54b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -63,7 +63,7 @@ class FlowRunEntityReader extends TimelineEntityReader { public FlowRunEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); + super(ctxt, entityFilters, toRetrieve); } public FlowRunEntityReader(TimelineReaderContext ctxt, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 0b3f7df4000..f6904c5a298 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; import java.util.EnumSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; @@ -70,9 +73,8 @@ class GenericEntityReader extends TimelineEntityReader { new StringKeyConverter(); public GenericEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { - super(ctxt, entityFilters, toRetrieve, sortedKeys); + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve); } public GenericEntityReader(TimelineReaderContext ctxt, @@ -424,18 +426,44 @@ class GenericEntityReader extends TimelineEntityReader { protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); - byte[] rowKey = - new EntityRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId(), - // TODO YARN-5585, change prefix id from 0L - context.getEntityType(), 0L, context.getEntityId()).getRowKey(); + Result result = null; + if (context.getEntityIdPrefix() != null) { + byte[] rowKey = new EntityRowKey(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), + context.getEntityIdPrefix(), context.getEntityId()).getRowKey(); - Get get = new Get(rowKey); - get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); + Get get = new Get(rowKey); + get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + result = getTable().getResult(hbaseConf, conn, get); + + } else { + // Prepare for range scan + // create single SingleColumnValueFilter and add to existing filters. + FilterList filter = new FilterList(Operator.MUST_PASS_ALL); + if (filterList != null && !filterList.getFilters().isEmpty()) { + filter.addFilter(filterList); + } + FilterList newFilter = new FilterList(); + newFilter.addFilter(TimelineFilterUtils.createHBaseSingleColValueFilter( + EntityColumn.ID, context.getEntityId(), CompareOp.EQUAL)); + newFilter.addFilter(new PageFilter(1)); + filter.addFilter(newFilter); + + ResultScanner results = getResults(hbaseConf, conn, filter); + try { + Iterator iterator = results.iterator(); + if (iterator.hasNext()) { + result = iterator.next(); + } + } finally { + results.close(); + } } - return getTable().getResult(hbaseConf, conn, get); + return result; } @Override @@ -445,11 +473,36 @@ class GenericEntityReader extends TimelineEntityReader { // and one type Scan scan = new Scan(); TimelineReaderContext context = getContext(); - RowKeyPrefix entityRowKeyPrefix = - new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType()); - scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + RowKeyPrefix entityRowKeyPrefix = null; + // default mode, will always scans from beginning of entity type. + if (getFilters() == null || getFilters().getFromIdPrefix() == null) { + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), null, null); + scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); + } else { // pagination mode, will scan from given entityIdPrefix!enitityId + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), + getFilters().getFromIdPrefix(), getFilters().getFromId()); + + // set start row + scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix()); + + // get the bytes for stop row + entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId(), context.getEntityType(), null, null); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + entityRowKeyPrefix.getRowKeyPrefix())); + + // set page filter to limit. This filter has to set only in pagination + // mode. + filterList.addFilter(new PageFilter(getFilters().getLimit())); + } scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { scan.setFilter(filterList); @@ -463,10 +516,10 @@ class GenericEntityReader extends TimelineEntityReader { return null; } TimelineEntity entity = new TimelineEntity(); - String entityType = EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); + EntityRowKey parseRowKey = EntityRowKey.parseRowKey(result.getRow()); + entity.setType(parseRowKey.getEntityType()); + entity.setId(parseRowKey.getEntityId()); + entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue()); TimelineEntityFilters filters = getFilters(); // fetch created time diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 8e3a3579962..4c88cd32761 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -21,11 +21,10 @@ import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.NavigableMap; -import java.util.NavigableSet; import java.util.Set; -import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,14 +73,6 @@ public abstract class TimelineEntityReader extends */ private BaseTable table; - /** - * Specifies whether keys for this table are sorted in a manner where entities - * can be retrieved by created time. If true, it will be sufficient to collect - * the first results as specified by the limit. Otherwise all matched entities - * will be fetched and then limit applied. - */ - private boolean sortedKeys = false; - /** * Used to convert strings key components to and from storage format. */ @@ -95,15 +86,11 @@ public abstract class TimelineEntityReader extends * made. * @param entityFilters Filters which limit the entities returned. * @param toRetrieve Data to retrieve for each entity. - * @param sortedKeys Specifies whether key for this table are sorted or not. - * If sorted, entities can be retrieved by created time. */ protected TimelineEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { super(ctxt); this.singleEntityRead = false; - this.sortedKeys = sortedKeys; this.dataToRetrieve = toRetrieve; this.filters = entityFilters; @@ -245,7 +232,7 @@ public abstract class TimelineEntityReader extends validateParams(); augmentParams(hbaseConf, conn); - NavigableSet entities = new TreeSet<>(); + Set entities = new LinkedHashSet<>(); FilterList filterList = createFilterList(); if (LOG.isDebugEnabled() && filterList != null) { LOG.debug("FilterList created for scan is - " + filterList); @@ -258,14 +245,8 @@ public abstract class TimelineEntityReader extends continue; } entities.add(entity); - if (!sortedKeys) { - if (entities.size() > filters.getLimit()) { - entities.pollLast(); - } - } else { - if (entities.size() == filters.getLimit()) { - break; - } + if (entities.size() == filters.getLimit()) { + break; } } return entities; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java index e90338e085c..16fffa4d816 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -83,7 +83,7 @@ public final class TimelineEntityReaderFactory { return new FlowRunEntityReader(context, filters, dataToRetrieve); } else { // assume we're dealing with a generic entity read - return new GenericEntityReader(context, filters, dataToRetrieve, false); + return new GenericEntityReader(context, filters, dataToRetrieve); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineUIDConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineUIDConverter.java index d5e791b4e3f..11dc9133675 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineUIDConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineUIDConverter.java @@ -53,19 +53,19 @@ public class TestTimelineUIDConverter { assertEquals(context, TimelineUIDConverter.APPLICATION_UID.decodeUID(uid)); context = new TimelineReaderContext("yarn_cluster", "root", "hive_join", - 1234L, "application_1111111111_1111", "YARN_CONTAINER", + 1234L, "application_1111111111_1111", "YARN_CONTAINER", 12345L, "container_1111111111_1111_01_000001"); uid = TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context); assertEquals("yarn_cluster!root!hive_join!1234!application_1111111111_1111!" - + "YARN_CONTAINER!container_1111111111_1111_01_000001", uid); + + "YARN_CONTAINER!12345!container_1111111111_1111_01_000001", uid); assertEquals( context, TimelineUIDConverter.GENERIC_ENTITY_UID.decodeUID(uid)); context = new TimelineReaderContext("yarn_cluster", null, null, null, - "application_1111111111_1111", "YARN_CONTAINER", + "application_1111111111_1111", "YARN_CONTAINER", 54321L, "container_1111111111_1111_01_000001"); uid = TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context); assertEquals("yarn_cluster!application_1111111111_1111!YARN_CONTAINER!" + - "container_1111111111_1111_01_000001", uid); + "54321!container_1111111111_1111_01_000001", uid); assertEquals( context, TimelineUIDConverter.GENERIC_ENTITY_UID.decodeUID(uid)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 6c6d1b3855d..7560f3345dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -139,6 +139,7 @@ public class TestRowKeys { TimelineEntity entity = new TimelineEntity(); entity.setId("!ent!ity!!id!"); entity.setType("entity!Type"); + entity.setIdPrefix(54321); byte[] byteRowKey = new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, @@ -151,11 +152,13 @@ public class TestRowKeys { assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); assertEquals(APPLICATION_ID, rowKey.getAppId()); assertEquals(entity.getType(), rowKey.getEntityType()); + assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue()); assertEquals(entity.getId(), rowKey.getEntityId()); byte[] byteRowKeyPrefix = new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, - APPLICATION_ID, entity.getType()).getRowKeyPrefix(); + APPLICATION_ID, entity.getType(), null, null) + .getRowKeyPrefix(); byte[][] splits = Separator.QUALIFIERS.split( byteRowKeyPrefix, @@ -163,8 +166,7 @@ public class TestRowKeys { Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); - assertEquals(8, splits.length); - assertEquals(entity.getIdPrefix(), splits[7].length); + assertEquals(7, splits.length); assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); assertEquals(entity.getType(), Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));