diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index fa35fc5a50e..485c19172a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -40,6 +40,8 @@ import javax.ws.rs.core.MediaType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.junit.After; import org.junit.AfterClass; @@ -352,6 +355,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { flowVersion2, runid2, entity3.getId(), te3); hbi.write(cluster, user, flow, flowVersion, runid, "application_1111111111_1111", userEntities); + writeApplicationEntities(hbi); hbi.flush(); } finally { if (hbi != null) { @@ -360,6 +364,35 @@ public class TestTimelineReaderWebServicesHBaseStorage { } } + static void writeApplicationEntities(HBaseTimelineWriterImpl hbi) + throws IOException { + long currentTimeMillis = System.currentTimeMillis(); + int count = 1; + for (long i = 1; i <= 3; i++) { + for (int j = 1; j <= 5; j++) { + TimelineEntities te = new TimelineEntities(); + ApplicationId appId = + BuilderUtils.newApplicationId(currentTimeMillis, count++); + ApplicationEntity appEntity = new ApplicationEntity(); + appEntity.setId(appId.toString()); + appEntity.setCreatedTime(currentTimeMillis); + + TimelineEvent created = new TimelineEvent(); + created.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + created.setTimestamp(currentTimeMillis); + appEntity.addEvent(created); + TimelineEvent finished = new TimelineEvent(); + finished.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + finished.setTimestamp(currentTimeMillis + i * j); + + appEntity.addEvent(finished); + te.addEntity(appEntity); + hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i, + appEntity.getId(), te); + } + } + } + @AfterClass public static void tearDown() throws Exception { util.shutdownMiniCluster(); @@ -697,7 +730,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { Set flowEntities = resp.getEntity(new GenericType>(){}); assertNotNull(flowEntities); - assertEquals(2, flowEntities.size()); + assertEquals(3, flowEntities.size()); List listFlowUIDs = new ArrayList(); for (FlowActivityEntity entity : flowEntities) { String flowUID = @@ -709,7 +742,9 @@ public class TestTimelineReaderWebServicesHBaseStorage { assertTrue((entity.getId().endsWith("@flow_name") && entity.getFlowRuns().size() == 2) || (entity.getId().endsWith("@flow_name2") && - entity.getFlowRuns().size() == 1)); + entity.getFlowRuns().size() == 1) + || (entity.getId().endsWith("@flow1") + && entity.getFlowRuns().size() == 3)); } // Query flowruns based on UID returned in query above. @@ -731,7 +766,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { flowRunUID); } } - assertEquals(3, listFlowRunUIDs.size()); + assertEquals(6, listFlowRunUIDs.size()); // Query single flowrun based on UIDs' returned in query to get flowruns. for (String flowRunUID : listFlowRunUIDs) { @@ -763,7 +798,7 @@ public class TestTimelineReaderWebServicesHBaseStorage { context.getFlowRunId(), entity.getId(), null, null)), appUID); } } - assertEquals(4, listAppUIDs.size()); + assertEquals(19, listAppUIDs.size()); // Query single app based on UIDs' returned in query to get apps. for (String appUID : listAppUIDs) { @@ -944,32 +979,20 @@ public class TestTimelineReaderWebServicesHBaseStorage { try { URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/clusters/cluster1/flows"); - ClientResponse resp = getResponse(client, uri); - Set entities = - resp.getEntity(new GenericType>(){}); - assertNotNull(entities); - assertEquals(2, entities.size()); - for (FlowActivityEntity entity : entities) { - assertTrue((entity.getId().endsWith("@flow_name") && - entity.getFlowRuns().size() == 2) || - (entity.getId().endsWith("@flow_name2") && - entity.getFlowRuns().size() == 1)); - } + + verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, + new String[] {"flow1", "flow_name", "flow_name2"}); // Query without specifying cluster ID. uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/flows/"); - resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType>(){}); - assertNotNull(entities); - assertEquals(2, entities.size()); + verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, + new String[] {"flow1", "flow_name", "flow_name2"}); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/clusters/cluster1/flows?limit=1"); - resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType>(){}); - assertNotNull(entities); - assertEquals(1, entities.size()); + verifyFlowEntites(client, uri, 1, new int[] {3}, + new String[] {"flow1"}); long firstFlowActivity = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L); @@ -979,40 +1002,25 @@ public class TestTimelineReaderWebServicesHBaseStorage { "timeline/clusters/cluster1/flows?daterange=" + fmt.format(firstFlowActivity) + "-" + fmt.format(dayTs)); - resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType>(){}); - assertNotNull(entities); - assertEquals(2, entities.size()); - for (FlowActivityEntity entity : entities) { - assertTrue((entity.getId().endsWith("@flow_name") && - entity.getFlowRuns().size() == 2) || - (entity.getId().endsWith("@flow_name2") && - entity.getFlowRuns().size() == 1)); - } + verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, + new String[] {"flow1", "flow_name", "flow_name2"}); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=" + fmt.format(dayTs + (4*86400000L))); - resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType>(){}); - assertNotNull(entities); - assertEquals(0, entities.size()); + verifyFlowEntites(client, uri, 0, new int[] {}, new String[] {}); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=-" + fmt.format(dayTs)); - resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType>(){}); - assertNotNull(entities); - assertEquals(2, entities.size()); + verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, + new String[] {"flow1", "flow_name", "flow_name2"}); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=" + fmt.format(firstFlowActivity) + "-"); - resp = getResponse(client, uri); - entities = resp.getEntity(new GenericType>(){}); - assertNotNull(entities); - assertEquals(2, entities.size()); + verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, + new String[] {"flow1", "flow_name", "flow_name2"}); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + "timeline/clusters/cluster1/flows?daterange=20150711:20150714"); @@ -2242,4 +2250,162 @@ public class TestTimelineReaderWebServicesHBaseStorage { } return entity; } + + private void verifyFlowEntites(Client client, URI uri, int noOfEntities, + int[] a, String[] flowsInSequence) throws Exception { + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(noOfEntities, entities.size()); + assertEquals(noOfEntities, flowsInSequence.length); + assertEquals(noOfEntities, a.length); + int count = 0; + for (FlowActivityEntity timelineEntity : entities) { + assertEquals(flowsInSequence[count], + timelineEntity.getInfo().get("SYSTEM_INFO_FLOW_NAME")); + assertEquals(a[count++], timelineEntity.getFlowRuns().size()); + } + } + + @Test + public void testForFlowAppsPagination() throws Exception { + Client client = createClient(); + try { + // app entities stored is 15 during initialization. + int totalAppEntities = 15; + String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/users/user1/flows/flow1/apps"; + URI uri = URI.create(resourceUri); + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(totalAppEntities, entities.size()); + TimelineEntity entity1 = entities.get(0); + TimelineEntity entity15 = entities.get(totalAppEntities - 1); + + int limit = 10; + String queryParam = "?limit=" + limit; + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(limit, entities.size()); + assertEquals(entity1, entities.get(0)); + TimelineEntity entity10 = entities.get(limit - 1); + + uri = + URI.create(resourceUri + queryParam + "&fromid=" + entity10.getId()); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(6, entities.size()); + assertEquals(entity10, entities.get(0)); + assertEquals(entity15, entities.get(5)); + + } finally { + client.destroy(); + } + } + + @Test + public void testForFlowRunAppsPagination() throws Exception { + Client client = createClient(); + try { + // app entities stored is 15 during initialization. + int totalAppEntities = 5; + String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/users/user1/flows/flow1/runs/1/apps"; + URI uri = URI.create(resourceUri); + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(totalAppEntities, entities.size()); + TimelineEntity entity1 = entities.get(0); + TimelineEntity entity5 = entities.get(totalAppEntities - 1); + + int limit = 3; + String queryParam = "?limit=" + limit; + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(limit, entities.size()); + assertEquals(entity1, entities.get(0)); + TimelineEntity entity3 = entities.get(limit - 1); + + uri = + URI.create(resourceUri + queryParam + "&fromid=" + entity3.getId()); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(3, entities.size()); + assertEquals(entity3, entities.get(0)); + assertEquals(entity5, entities.get(2)); + + } finally { + client.destroy(); + } + } + + @Test + public void testForFlowRunsPagination() throws Exception { + Client client = createClient(); + try { + // app entities stored is 15 during initialization. + int totalRuns = 3; + String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/users/user1/flows/flow1/runs"; + URI uri = URI.create(resourceUri); + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(totalRuns, entities.size()); + TimelineEntity entity1 = entities.get(0); + TimelineEntity entity3 = entities.get(totalRuns - 1); + + int limit = 2; + String queryParam = "?limit=" + limit; + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(limit, entities.size()); + assertEquals(entity1, entities.get(0)); + TimelineEntity entity2 = entities.get(limit - 1); + + uri = URI.create(resourceUri + queryParam + "&fromid=" + + entity2.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID")); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(limit, entities.size()); + assertEquals(entity2, entities.get(0)); + assertEquals(entity3, entities.get(1)); + + uri = URI.create(resourceUri + queryParam + "&fromid=" + + entity3.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID")); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(1, entities.size()); + assertEquals(entity3, entities.get(0)); + } finally { + client.destroy(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 994c27647c8..df3ccabc9a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -1097,6 +1097,9 @@ public class TimelineReaderWebServices { * METRICS makes sense for flow runs hence only ALL or METRICS are * supported as fields for fetching flow runs. Other fields will lead to * HTTP 400 (Bad Request) response. (Optional query param). + * @param fromId Defines the flow run id. If specified, retrieve the next + * set of flow runs from the given id. The set of flow runs retrieved + * is inclusive of specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowRunEntity instances for the given flow are @@ -1118,7 +1121,8 @@ public class TimelineReaderWebServices { @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("metricstoretrieve") String metricsToRetrieve, - @QueryParam("fields") String fields) { + @QueryParam("fields") String fields, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1140,11 +1144,12 @@ public class TimelineReaderWebServices { entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, - null, null, null, null, null), + null, null, null, null, fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, fields, null)); } catch (Exception e) { - handleException(e, url, startTime, "createdTime start/end or limit"); + handleException(e, url, startTime, + "createdTime start/end or limit or fromId"); } long endTime = Time.monotonicNow(); if (entities == null) { @@ -1182,6 +1187,9 @@ public class TimelineReaderWebServices { * METRICS makes sense for flow runs hence only ALL or METRICS are * supported as fields for fetching flow runs. Other fields will lead to * HTTP 400 (Bad Request) response. (Optional query param). + * @param fromId Defines the flow run id. If specified, retrieve the next + * set of flow runs from the given id. The set of flow runs retrieved + * is inclusive of specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowRunEntity instances for the given flow are @@ -1204,9 +1212,10 @@ public class TimelineReaderWebServices { @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("metricstoretrieve") String metricsToRetrieve, - @QueryParam("fields") String fields) { + @QueryParam("fields") String fields, + @QueryParam("fromid") String fromId) { return getFlowRuns(req, res, null, userId, flowName, limit, - createdTimeStart, createdTimeEnd, metricsToRetrieve, fields); + createdTimeStart, createdTimeEnd, metricsToRetrieve, fields, fromId); } /** @@ -1237,6 +1246,9 @@ public class TimelineReaderWebServices { * METRICS makes sense for flow runs hence only ALL or METRICS are * supported as fields for fetching flow runs. Other fields will lead to * HTTP 400 (Bad Request) response. (Optional query param). + * @param fromId Defines the flow run id. If specified, retrieve the next + * set of flow runs from the given id. The set of flow runs retrieved + * is inclusive of specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowRunEntity instances for the given flow are @@ -1260,7 +1272,8 @@ public class TimelineReaderWebServices { @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("metricstoretrieve") String metricsToRetrieve, - @QueryParam("fields") String fields) { + @QueryParam("fields") String fields, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1279,11 +1292,12 @@ public class TimelineReaderWebServices { TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, - null, null, null, null, null), + null, null, null, null, fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, fields, null)); } catch (Exception e) { - handleException(e, url, startTime, "createdTime start/end or limit"); + handleException(e, url, startTime, + "createdTime start/end or limit or fromId"); } long endTime = Time.monotonicNow(); if (entities == null) { @@ -1719,6 +1733,9 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromId Defines the application id. If specified, retrieve the next + * set of applications from the given id. The set of applications + * retrieved is inclusive of specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -1748,7 +1765,8 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1771,7 +1789,7 @@ public class TimelineReaderWebServices { TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, null, - null), + fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -1847,6 +1865,9 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromId Defines the application id. If specified, retrieve the next + * set of applications from the given id. The set of applications + * retrieved is inclusive of specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -1878,12 +1899,13 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { return getEntities(req, res, null, null, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId); } /** @@ -1947,6 +1969,9 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromId Defines the application id. If specified, retrieve the next + * set of applications from the given id. The set of applications + * retrieved is inclusive of specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -1980,12 +2005,13 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { return getEntities(req, res, clusterId, null, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId); } /** @@ -2046,6 +2072,9 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromId Defines the application id. If specified, retrieve the next + * set of applications from the given id. The set of applications + * retrieved is inclusive of specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -2076,12 +2105,13 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { return getEntities(req, res, null, null, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId); } /** @@ -2143,6 +2173,9 @@ public class TimelineReaderWebServices { * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromId Defines the application id. If specified, retrieve the next + * set of applications from the given id. The set of applications + * retrieved is inclusive of specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -2174,12 +2207,13 @@ public class TimelineReaderWebServices { @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { return getEntities(req, res, clusterId, null, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId); } /** @@ -3107,4 +3141,4 @@ public class TimelineReaderWebServices { " (Took " + (endTime - startTime) + " ms.)"); return results; } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 1667f614391..8a331c3fb49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -48,7 +48,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -359,13 +361,44 @@ class ApplicationEntityReader extends GenericEntityReader { Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); + RowKeyPrefix applicationRowKeyPrefix = null; + // Whether or not flowRunID is null doesn't matter, the // ApplicationRowKeyPrefix will do the right thing. - RowKeyPrefix applicationRowKeyPrefix = - new ApplicationRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName(), - context.getFlowRunId()); - scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix()); + // default mode, will always scans from beginning of entity type. + if (getFilters().getFromId() == null) { + applicationRowKeyPrefix = new ApplicationRowKeyPrefix( + context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowRunId()); + scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix()); + } else { + Long flowRunId = context.getFlowRunId(); + if (flowRunId == null) { + AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey( + context.getClusterId(), getFilters().getFromId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, hbaseConf, conn); + flowRunId = flowContext.getFlowRunId(); + } + + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), flowRunId, getFilters().getFromId()); + + // set start row + scan.setStartRow(applicationRowKey.getRowKey()); + + // get the bytes for stop row + applicationRowKeyPrefix = new ApplicationRowKeyPrefix( + context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowRunId()); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + applicationRowKeyPrefix.getRowKeyPrefix())); + } + FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 9b8482c54b1..cedf96a8673 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; @@ -210,10 +211,30 @@ class FlowRunEntityReader extends TimelineEntityReader { FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); - RowKeyPrefix flowRunRowKeyPrefix = - new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName()); - scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); + RowKeyPrefix flowRunRowKeyPrefix = null; + if (getFilters().getFromId() == null) { + flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName()); + scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); + } else { + + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(context.getClusterId(), context.getUserId(), + context.getFlowName(), Long.parseLong(getFilters().getFromId())); + + // set start row + scan.setStartRow(flowRunRowKey.getRowKey()); + + // get the bytes for stop row + flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName()); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + flowRunRowKeyPrefix.getRowKeyPrefix())); + } + FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) {